#
# SPDX-License-Identifier: Apache-2.0
+from past.builtins import basestring
+from future.utils import viewitems
+
import os
import json
import copy
from cwltool.process import shortname
from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.builder import Builder
from cwltool.context import LoadingContext
import ruamel.yaml as yaml
from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
- trim_anonymous_location, remove_redundant_fields, discover_secondary_files)
+ trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
+ make_builder)
from .pathmapper import ArvPathMapper, trim_listing
-from .arvtool import ArvadosCommandTool, check_cluster_target
+from .arvtool import ArvadosCommandTool, set_cluster_target
+
from .perf import Perf
logger = logging.getLogger('arvados.cwl-runner')
return cmap(overall_res_req)
class ArvadosWorkflowStep(WorkflowStep):
+ def __init__(self,
+ toolpath_object, # type: Dict[Text, Any]
+ pos, # type: int
+ loadingContext, # type: LoadingContext
+ arvrunner,
+ *argc,
+ **argv
+ ): # type: (...) -> None
+
+ super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
+ self.tool["class"] = "WorkflowStep"
+ self.arvrunner = arvrunner
+
def job(self, joborder, output_callback, runtimeContext):
- builder = self._init_job({shortname(k): v for k,v in joborder.items()}, runtimeContext)
- check_cluster_target(self, builder, runtimeContext)
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.toplevel = True # Preserve behavior for #13365
+
+ builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements, runtimeContext)
+ runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
+
class ArvadosWorkflow(Workflow):
"""Wrap cwltool Workflow to override selected methods."""
def __init__(self, arvrunner, toolpath_object, loadingContext):
- super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
self.arvrunner = arvrunner
self.wf_pdh = None
self.dynamic_resource_req = []
self.static_resource_req = []
self.wf_reffiles = []
self.loadingContext = loadingContext
+ super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
+ self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
def job(self, joborder, output_callback, runtimeContext):
- builder = self._init_job(joborder, runtimeContext)
- check_cluster_target(self, builder, runtimeContext)
+ builder = make_builder(joborder, self.hints, self.requirements, runtimeContext)
+ runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
if not req:
raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
- discover_secondary_files(self.tool["inputs"], joborder)
+ discover_secondary_files(self.arvrunner.fs_access, builder,
+ self.tool["inputs"], joborder)
with Perf(metrics, "subworkflow upload_deps"):
upload_dependencies(self.arvrunner,
raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
if not dyn:
self.static_resource_req.append(req)
+ if req["class"] == "DockerRequirement":
+ if "http://arvados.org/cwl#dockerCollectionPDH" in req:
+ del req["http://arvados.org/cwl#dockerCollectionPDH"]
visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
adjustDirObjs(packed, keepmount)
self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
+ self.loadingContext = self.loadingContext.copy()
+ self.loadingContext.metadata = self.loadingContext.metadata.copy()
+ self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
+
+ if len(job_res_reqs) == 1:
+ # RAM request needs to be at least 128 MiB or the workflow
+ # runner itself won't run reliably.
+ if job_res_reqs[0].get("ramMin", 1024) < 128:
+ job_res_reqs[0]["ramMin"] = 128
+
wf_runner = cmap({
"class": "CommandLineTool",
"baseCommand": "cwltool",
toolpath_object, # type: Dict[Text, Any]
pos, # type: int
loadingContext, # type: LoadingContext
- parentworkflowProv=None # type: Optional[CreateProvProfile]
+ *argc,
+ **argv
):
# (...) -> WorkflowStep
- return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, parentworkflowProv)
+ return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)