11162: Add --submit-request-uuid
[arvados.git] / sdk / cwl / arvados_cwl / arvjob.py
index 88155b5b958df8e0b7b04a602becccd772718bc3..04256c68f8b10f47ede2fefcabb0172948c2ff00 100644 (file)
@@ -134,6 +134,8 @@ class ArvadosJob(object):
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
 
+        self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+
         try:
             with Perf(metrics, "create %s" % self.name):
                 response = self.arvrunner.api.jobs().create(
@@ -150,7 +152,8 @@ class ArvadosJob(object):
                     find_or_create=enable_reuse
                 ).execute(num_retries=self.arvrunner.num_retries)
 
-            self.arvrunner.processes[response["uuid"]] = self
+            self.uuid = response["uuid"]
+            self.arvrunner.process_submitted(self)
 
             self.update_pipeline_component(response)
 
@@ -171,9 +174,6 @@ class ArvadosJob(object):
                         logger.info("Creating read permission on job %s: %s",
                                     response["uuid"],
                                     e)
-
-                with Perf(metrics, "done %s" % self.name):
-                    self.done(response)
             else:
                 logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
         except Exception as e:
@@ -181,27 +181,28 @@ class ArvadosJob(object):
             self.output_callback({}, "permanentFail")
 
     def update_pipeline_component(self, record):
-        if self.arvrunner.pipeline:
-            self.arvrunner.pipeline["components"][self.name] = {"job": record}
-            with Perf(metrics, "update_pipeline_component %s" % self.name):
-                self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
-                    uuid=self.arvrunner.pipeline["uuid"],
-                    body={
-                        "components": self.arvrunner.pipeline["components"]
-                    }).execute(num_retries=self.arvrunner.num_retries)
-        if self.arvrunner.uuid:
-            try:
-                job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
-                if job:
-                    components = job["components"]
-                    components[self.name] = record["uuid"]
-                    self.arvrunner.api.jobs().update(
-                        uuid=self.arvrunner.uuid,
+        with self.arvrunner.workflow_eval_lock:
+            if self.arvrunner.pipeline:
+                self.arvrunner.pipeline["components"][self.name] = {"job": record}
+                with Perf(metrics, "update_pipeline_component %s" % self.name):
+                    self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
+                        uuid=self.arvrunner.pipeline["uuid"],
                         body={
-                            "components": components
+                            "components": self.arvrunner.pipeline["components"]
                         }).execute(num_retries=self.arvrunner.num_retries)
-            except Exception as e:
-                logger.info("Error adding to components: %s", e)
+            if self.arvrunner.uuid:
+                try:
+                    job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
+                    if job:
+                        components = job["components"]
+                        components[self.name] = record["uuid"]
+                        self.arvrunner.api.jobs().update(
+                            uuid=self.arvrunner.uuid,
+                            body={
+                                "components": components
+                            }).execute(num_retries=self.arvrunner.num_retries)
+                except Exception as e:
+                    logger.info("Error adding to components: %s", e)
 
     def done(self, record):
         try:
@@ -263,8 +264,7 @@ class ArvadosJob(object):
                 processStatus = "permanentFail"
         finally:
             self.output_callback(outputs, processStatus)
-            if record["uuid"] in self.arvrunner.processes:
-                del self.arvrunner.processes[record["uuid"]]
+
 
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
@@ -314,8 +314,8 @@ class RunnerJob(Runner):
             }
         }
 
-    def run(self, *args, **kwargs):
-        job_spec = self.arvados_job_spec(*args, **kwargs)
+    def run(self, **kwargs):
+        job_spec = self.arvados_job_spec(**kwargs)
 
         job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
 
@@ -351,10 +351,7 @@ class RunnerJob(Runner):
             return
 
         self.uuid = job["uuid"]
-        self.arvrunner.processes[self.uuid] = self
-
-        if job["state"] in ("Complete", "Failed", "Cancelled"):
-            self.done(job)
+        self.arvrunner.process_submitted(self)
 
 
 class RunnerTemplate(object):