X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e45d6feabba025220783beea6caed60cd1e7d782..d825b0330a1b51d8ccbb25e7dc7d9aac26e781e0:/sdk/cwl/arvados_cwl/arvworkflow.py diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py index 97c5fafe79..02c9c7a97a 100644 --- a/sdk/cwl/arvados_cwl/arvworkflow.py +++ b/sdk/cwl/arvados_cwl/arvworkflow.py @@ -17,16 +17,17 @@ from cwltool.pack import pack from cwltool.load_tool import fetch_document, resolve_and_validate_document from cwltool.process import shortname from cwltool.workflow import Workflow, WorkflowException, WorkflowStep -from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class +from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs from cwltool.context import LoadingContext import ruamel.yaml as yaml from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection, trim_anonymous_location, remove_redundant_fields, discover_secondary_files, - make_builder) + make_builder, arvados_jobs_image) from .pathmapper import ArvPathMapper, trim_listing from .arvtool import ArvadosCommandTool, set_cluster_target +from ._version import __version__ from .perf import Perf @@ -36,10 +37,13 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics') max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax") sum_res_pars = ("outdirMin", "outdirMax") -def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, - submit_runner_ram=0, name=None, merged_map=None): +def upload_workflow(arvRunner, tool, job_order, project_uuid, + runtimeContext, uuid=None, + submit_runner_ram=0, name=None, merged_map=None, + submit_runner_image=None, + git_info=None): - packed = packed_workflow(arvRunner, tool, merged_map) + packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext, git_info) adjustDirObjs(job_order, trim_listing) adjustFileObjs(job_order, trim_anonymous_location) @@ -55,20 +59,30 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, name = tool.tool.get("label", os.path.basename(tool.tool["id"])) upload_dependencies(arvRunner, name, tool.doc_loader, - packed, tool.tool["id"], False) + packed, tool.tool["id"], False, + runtimeContext) + + wf_runner_resources = None + + hints = main.get("hints", []) + found = False + for h in hints: + if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources": + wf_runner_resources = h + found = True + break + if not found: + wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"} + hints.append(wf_runner_resources) + + wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner, + submit_runner_image or "arvados/jobs:"+__version__, + runtimeContext) if submit_runner_ram: - hints = main.get("hints", []) - found = False - for h in hints: - if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources": - h["ramMin"] = submit_runner_ram - found = True - break - if not found: - hints.append({"class": "http://arvados.org/cwl#WorkflowRunnerResources", - "ramMin": submit_runner_ram}) - main["hints"] = hints + wf_runner_resources["ramMin"] = submit_runner_ram + + main["hints"] = hints body = { "workflow": { @@ -133,8 +147,13 @@ class ArvadosWorkflowStep(WorkflowStep): **argv ): # type: (...) -> None - super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv) - self.tool["class"] = "WorkflowStep" + if arvrunner.fast_submit: + self.tool = toolpath_object + self.tool["inputs"] = [] + self.tool["outputs"] = [] + else: + super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv) + self.tool["class"] = "WorkflowStep" self.arvrunner = arvrunner def job(self, joborder, output_callback, runtimeContext): @@ -177,6 +196,7 @@ class ArvadosWorkflow(Workflow): discover_secondary_files(self.arvrunner.fs_access, builder, self.tool["inputs"], joborder) + normalizeFilesDirs(joborder) with Perf(metrics, "subworkflow upload_deps"): upload_dependencies(self.arvrunner, @@ -184,7 +204,8 @@ class ArvadosWorkflow(Workflow): self.doc_loader, joborder, joborder.get("id", "#"), - False) + False, + runtimeContext) if self.wf_pdh is None: packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader) @@ -227,7 +248,8 @@ class ArvadosWorkflow(Workflow): self.doc_loader, packed, self.tool["id"], - False) + False, + runtimeContext) # Discover files/directories referenced by the # workflow (mainly "default" values) @@ -291,7 +313,7 @@ class ArvadosWorkflow(Workflow): if self.wf_pdh is None: adjustFileObjs(packed, keepmount) adjustDirObjs(packed, keepmount) - self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed) + self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext) self.loadingContext = self.loadingContext.copy() self.loadingContext.metadata = self.loadingContext.metadata.copy()