19688: Trying a fast path submit for registered workflows
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
index 4fe82a6fe1d6fc32f709dd909577da7010970e07..02c9c7a97ac3e30aecff9dfcc221877dbc7fe4b0 100644 (file)
@@ -37,11 +37,13 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
 max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
 sum_res_pars = ("outdirMin", "outdirMax")
 
-def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
+def upload_workflow(arvRunner, tool, job_order, project_uuid,
+                    runtimeContext, uuid=None,
                     submit_runner_ram=0, name=None, merged_map=None,
-                    submit_runner_image=None):
+                    submit_runner_image=None,
+                    git_info=None):
 
-    packed = packed_workflow(arvRunner, tool, merged_map)
+    packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext, git_info)
 
     adjustDirObjs(job_order, trim_listing)
     adjustFileObjs(job_order, trim_anonymous_location)
@@ -57,7 +59,8 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
         name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
 
     upload_dependencies(arvRunner, name, tool.doc_loader,
-                        packed, tool.tool["id"], False)
+                        packed, tool.tool["id"], False,
+                        runtimeContext)
 
     wf_runner_resources = None
 
@@ -72,7 +75,9 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
         wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
         hints.append(wf_runner_resources)
 
-    wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner, submit_runner_image or "arvados/jobs:"+__version__)
+    wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+                                                                  submit_runner_image or "arvados/jobs:"+__version__,
+                                                                  runtimeContext)
 
     if submit_runner_ram:
         wf_runner_resources["ramMin"] = submit_runner_ram
@@ -142,8 +147,13 @@ class ArvadosWorkflowStep(WorkflowStep):
                  **argv
                 ):  # type: (...) -> None
 
-        super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
-        self.tool["class"] = "WorkflowStep"
+        if arvrunner.fast_submit:
+            self.tool = toolpath_object
+            self.tool["inputs"] = []
+            self.tool["outputs"] = []
+        else:
+            super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
+            self.tool["class"] = "WorkflowStep"
         self.arvrunner = arvrunner
 
     def job(self, joborder, output_callback, runtimeContext):
@@ -194,7 +204,8 @@ class ArvadosWorkflow(Workflow):
                                 self.doc_loader,
                                 joborder,
                                 joborder.get("id", "#"),
-                                False)
+                                False,
+                                runtimeContext)
 
             if self.wf_pdh is None:
                 packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
@@ -237,7 +248,8 @@ class ArvadosWorkflow(Workflow):
                                     self.doc_loader,
                                     packed,
                                     self.tool["id"],
-                                    False)
+                                    False,
+                                    runtimeContext)
 
                 # Discover files/directories referenced by the
                 # workflow (mainly "default" values)
@@ -301,7 +313,7 @@ class ArvadosWorkflow(Workflow):
             if self.wf_pdh is None:
                 adjustFileObjs(packed, keepmount)
                 adjustDirObjs(packed, keepmount)
-                self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
+                self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
 
         self.loadingContext = self.loadingContext.copy()
         self.loadingContext.metadata = self.loadingContext.metadata.copy()