From d825b0330a1b51d8ccbb25e7dc7d9aac26e781e0 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 1 Nov 2022 21:34:35 -0400 Subject: [PATCH] 19688: Trying a fast path submit for registered workflows Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/__init__.py | 4 ++++ sdk/cwl/arvados_cwl/arvcontainer.py | 11 +++++++++-- sdk/cwl/arvados_cwl/arvworkflow.py | 9 +++++++-- sdk/cwl/arvados_cwl/executor.py | 15 ++++++++++----- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 08a05d571c..9200c5caa5 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -359,6 +359,10 @@ def main(args=sys.argv[1:], # unit tests. stdout = None + if arvargs.submit and arvargs.wait is False and arvargs.workflow.startswith("arvwf:"): + executor.loadingContext.do_validate = False + executor.fast_submit = True + return cwltool.main.main(args=arvargs, stdout=stdout, stderr=stderr, diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 66fe143e0f..57ab7367a3 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -523,6 +523,15 @@ class RunnerContainer(Runner): "kind": "collection", "portable_data_hash": "%s" % workflowcollection } + elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"): + workflowpath = "/var/lib/cwl/workflow.json#main" + record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries) + packed = yaml.safe_load(record["definition"]) + container_req["mounts"]["/var/lib/cwl/workflow.json"] = { + "kind": "json", + "content": packed + } + container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] else: packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info) workflowpath = "/var/lib/cwl/workflow.json#main" @@ -530,8 +539,6 @@ class RunnerContainer(Runner): "kind": "json", "content": packed } - if self.embedded_tool.tool.get("id", "").startswith("arvwf:"): - container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()}) diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py index 5f3feabf8c..02c9c7a97a 100644 --- a/sdk/cwl/arvados_cwl/arvworkflow.py +++ b/sdk/cwl/arvados_cwl/arvworkflow.py @@ -147,8 +147,13 @@ class ArvadosWorkflowStep(WorkflowStep): **argv ): # type: (...) -> None - super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv) - self.tool["class"] = "WorkflowStep" + if arvrunner.fast_submit: + self.tool = toolpath_object + self.tool["inputs"] = [] + self.tool["outputs"] = [] + else: + super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv) + self.tool["class"] = "WorkflowStep" self.arvrunner = arvrunner def job(self, joborder, output_callback, runtimeContext): diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index 694f77baf2..f95ac17e1f 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -137,6 +137,7 @@ class ArvCwlExecutor(object): self.fs_access = None self.secret_store = None self.stdout = stdout + self.fast_submit = False if keep_client is not None: self.keep_client = keep_client @@ -594,7 +595,8 @@ The 'jobs' API is no longer supported. controller = self.api.config()["Services"]["Controller"]["ExternalURL"] logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller) - updated_tool.visit(self.check_features) + if not self.fast_submit: + updated_tool.visit(self.check_features) self.pipeline = None self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir) @@ -662,7 +664,7 @@ The 'jobs' API is no longer supported. loadingContext = self.loadingContext.copy() loadingContext.do_validate = False loadingContext.disable_js_validation = True - if submitting: + if submitting and not self.fast_submit: loadingContext.do_update = False # Document may have been auto-updated. Reload the original # document with updating disabled because we want to @@ -675,9 +677,12 @@ The 'jobs' API is no longer supported. # Upload direct dependencies of workflow steps, get back mapping of files to keep references. # Also uploads docker images. - logger.info("Uploading workflow dependencies") - with Perf(metrics, "upload_workflow_deps"): - merged_map = upload_workflow_deps(self, tool, runtimeContext) + if not self.fast_submit: + logger.info("Uploading workflow dependencies") + with Perf(metrics, "upload_workflow_deps"): + merged_map = upload_workflow_deps(self, tool, runtimeContext) + else: + merged_map = {} # Recreate process object (ArvadosWorkflow or # ArvadosCommandTool) because tool document may have been -- 2.30.2