15358: Add encoding option to CollectionFsAccess.open() and use in fetch_text()
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
index f06d569bf3219429ae61ac04744051504aa8975a..c6bcd04776920186d9973bdf662c1fe060b9b8c2 100644 (file)
@@ -2,6 +2,9 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
+from past.builtins import basestring
+from future.utils import viewitems
+
 import os
 import json
 import copy
@@ -14,15 +17,16 @@ from cwltool.load_tool import fetch_document
 from cwltool.process import shortname
 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.builder import Builder
 from cwltool.context import LoadingContext
 
 import ruamel.yaml as yaml
 
 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
-                     trim_anonymous_location, remove_redundant_fields, discover_secondary_files)
+                     trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
+                     make_builder)
 from .pathmapper import ArvPathMapper, trim_listing
-from .arvtool import ArvadosCommandTool, check_cluster_target
+from .arvtool import ArvadosCommandTool, set_cluster_target
+
 from .perf import Perf
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -123,34 +127,41 @@ class ArvadosWorkflowStep(WorkflowStep):
                  toolpath_object,      # type: Dict[Text, Any]
                  pos,                  # type: int
                  loadingContext,       # type: LoadingContext
+                 arvrunner,
                  *argc,
                  **argv
                 ):  # type: (...) -> None
 
         super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
         self.tool["class"] = "WorkflowStep"
+        self.arvrunner = arvrunner
 
     def job(self, joborder, output_callback, runtimeContext):
-        builder = self._init_job({shortname(k): v for k,v in joborder.items()}, runtimeContext)
-        check_cluster_target(self, builder, runtimeContext)
+        runtimeContext = runtimeContext.copy()
+        runtimeContext.toplevel = True  # Preserve behavior for #13365
+
+        builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements, runtimeContext)
+        runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
 
+
 class ArvadosWorkflow(Workflow):
     """Wrap cwltool Workflow to override selected methods."""
 
     def __init__(self, arvrunner, toolpath_object, loadingContext):
-        super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
         self.arvrunner = arvrunner
         self.wf_pdh = None
         self.dynamic_resource_req = []
         self.static_resource_req = []
         self.wf_reffiles = []
         self.loadingContext = loadingContext
+        super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
+        self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
 
     def job(self, joborder, output_callback, runtimeContext):
 
-        builder = self._init_job(joborder, runtimeContext)
-        check_cluster_target(self, builder, runtimeContext)
+        builder = make_builder(joborder, self.hints, self.requirements, runtimeContext)
+        runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
 
         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
         if not req:
@@ -163,7 +174,8 @@ class ArvadosWorkflow(Workflow):
                 raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
         document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
 
-        discover_secondary_files(self.tool["inputs"], joborder)
+        discover_secondary_files(self.arvrunner.fs_access, builder,
+                                 self.tool["inputs"], joborder)
 
         with Perf(metrics, "subworkflow upload_deps"):
             upload_dependencies(self.arvrunner,
@@ -199,6 +211,9 @@ class ArvadosWorkflow(Workflow):
                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
                                 if not dyn:
                                     self.static_resource_req.append(req)
+                            if req["class"] == "DockerRequirement":
+                                if "http://arvados.org/cwl#dockerCollectionPDH" in req:
+                                    del req["http://arvados.org/cwl#dockerCollectionPDH"]
 
                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
 
@@ -276,6 +291,10 @@ class ArvadosWorkflow(Workflow):
                 adjustDirObjs(packed, keepmount)
                 self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
 
+        self.loadingContext = self.loadingContext.copy()
+        self.loadingContext.metadata = self.loadingContext.metadata.copy()
+        self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
+
         wf_runner = cmap({
             "class": "CommandLineTool",
             "baseCommand": "cwltool",
@@ -308,4 +327,4 @@ class ArvadosWorkflow(Workflow):
                            **argv
     ):
         # (...) -> WorkflowStep
-        return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, *argc, **argv)
+        return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)