19688: Trying a fast path submit for registered workflows
authorPeter Amstutz <peter.amstutz@curii.com>
Wed, 2 Nov 2022 01:34:35 +0000 (21:34 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Wed, 2 Nov 2022 01:34:35 +0000 (21:34 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/executor.py

index 08a05d571cb8e41bb48265489fcec9f13b1e6100..9200c5caa50589fb3cbdd5e9db1354a9d7fd8d47 100644 (file)
@@ -359,6 +359,10 @@ def main(args=sys.argv[1:],
         # unit tests.
         stdout = None
 
+    if arvargs.submit and arvargs.wait is False and arvargs.workflow.startswith("arvwf:"):
+        executor.loadingContext.do_validate = False
+        executor.fast_submit = True
+
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
index 66fe143e0f42d169d29ec04e85ddd50a6702f1c9..57ab7367a34bb2a176454ed42a9f98aaae71f166 100644 (file)
@@ -523,6 +523,15 @@ class RunnerContainer(Runner):
                 "kind": "collection",
                 "portable_data_hash": "%s" % workflowcollection
             }
+        elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
+            workflowpath = "/var/lib/cwl/workflow.json#main"
+            record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
+            packed = yaml.safe_load(record["definition"])
+            container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+                "kind": "json",
+                "content": packed
+            }
+            container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
         else:
             packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
             workflowpath = "/var/lib/cwl/workflow.json#main"
@@ -530,8 +539,6 @@ class RunnerContainer(Runner):
                 "kind": "json",
                 "content": packed
             }
-            if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
-                container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
 
         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
 
index 5f3feabf8c83271ccec89d667d296663b86fecfa..02c9c7a97ac3e30aecff9dfcc221877dbc7fe4b0 100644 (file)
@@ -147,8 +147,13 @@ class ArvadosWorkflowStep(WorkflowStep):
                  **argv
                 ):  # type: (...) -> None
 
-        super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
-        self.tool["class"] = "WorkflowStep"
+        if arvrunner.fast_submit:
+            self.tool = toolpath_object
+            self.tool["inputs"] = []
+            self.tool["outputs"] = []
+        else:
+            super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
+            self.tool["class"] = "WorkflowStep"
         self.arvrunner = arvrunner
 
     def job(self, joborder, output_callback, runtimeContext):
index 694f77baf246ecb56e6116e99dd5461deb9f6e53..f95ac17e1feaf3b99fc1af5384594ccfe665660e 100644 (file)
@@ -137,6 +137,7 @@ class ArvCwlExecutor(object):
         self.fs_access = None
         self.secret_store = None
         self.stdout = stdout
+        self.fast_submit = False
 
         if keep_client is not None:
             self.keep_client = keep_client
@@ -594,7 +595,8 @@ The 'jobs' API is no longer supported.
         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
 
-        updated_tool.visit(self.check_features)
+        if not self.fast_submit:
+            updated_tool.visit(self.check_features)
 
         self.pipeline = None
         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
@@ -662,7 +664,7 @@ The 'jobs' API is no longer supported.
         loadingContext = self.loadingContext.copy()
         loadingContext.do_validate = False
         loadingContext.disable_js_validation = True
-        if submitting:
+        if submitting and not self.fast_submit:
             loadingContext.do_update = False
             # Document may have been auto-updated. Reload the original
             # document with updating disabled because we want to
@@ -675,9 +677,12 @@ The 'jobs' API is no longer supported.
 
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
-        logger.info("Uploading workflow dependencies")
-        with Perf(metrics, "upload_workflow_deps"):
-            merged_map = upload_workflow_deps(self, tool, runtimeContext)
+        if not self.fast_submit:
+            logger.info("Uploading workflow dependencies")
+            with Perf(metrics, "upload_workflow_deps"):
+                merged_map = upload_workflow_deps(self, tool, runtimeContext)
+        else:
+            merged_map = {}
 
         # Recreate process object (ArvadosWorkflow or
         # ArvadosCommandTool) because tool document may have been