13365: Further fixes to ensure output callback only happens once
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 3 May 2018 19:48:16 +0000 (15:48 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 3 May 2018 19:48:16 +0000 (15:48 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/runner.py

index 8b31ece07be26010c6d2e3b81572bed513f00cfb..b5c91956a3aa03d31dcc12a1dd530d530febfe95 100644 (file)
@@ -157,10 +157,11 @@ class ArvCwlRunner(object):
         with self.workflow_eval_lock:
             self.processes[container.uuid] = container
 
-    def process_done(self, uuid):
+    def process_done(self, uuid, record):
         with self.workflow_eval_lock:
-            if uuid in self.processes:
-                del self.processes[uuid]
+            j = self.processes[uuid]
+            self.task_queue.add(partial(j.done, record))
+            del self.processes[uuid]
 
     def wrapped_callback(self, cb, obj, st):
         with self.workflow_eval_lock:
@@ -181,10 +182,8 @@ class ArvCwlRunner(object):
                         j.update_pipeline_component(event["properties"]["new_attributes"])
                         logger.info("%s %s is Running", self.label(j), uuid)
             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
-                with self.workflow_eval_lock:
-                    j = self.processes[uuid]
-                self.task_queue.add(partial(j.done, event["properties"]["new_attributes"]))
                 logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
+                self.process_done(uuid, event["properties"]["new_attributes"])
 
     def label(self, obj):
         return "[%s %s]" % (self.work_api[0:-1], obj.name)
index 9588803e7317cb844115ecdb2ffd68094cbdf2bf..4e7811d2e8f5b0b477b82334b79385618c3456b9 100644 (file)
@@ -316,7 +316,6 @@ class ArvadosContainer(object):
             processStatus = "permanentFail"
         finally:
             self.output_callback(outputs, processStatus)
-            self.arvrunner.process_done(record["uuid"])
 
 
 class RunnerContainer(Runner):
@@ -464,5 +463,3 @@ class RunnerContainer(Runner):
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             super(RunnerContainer, self).done(container)
-        finally:
-            self.arvrunner.process_done(record["uuid"])
index 801be7d1f2d635e289246ba31496b6f7bdccd071..04256c68f8b10f47ede2fefcabb0172948c2ff00 100644 (file)
@@ -264,7 +264,6 @@ class ArvadosJob(object):
                 processStatus = "permanentFail"
         finally:
             self.output_callback(outputs, processStatus)
-            self.arvrunner.process_done(record["uuid"])
 
 
 class RunnerJob(Runner):
index 6491933f2c5548330d0c75582a179c8e747367e6..8db3e61cf152306a570fcdd7af81b5b56fee3405 100644 (file)
@@ -431,5 +431,3 @@ class Runner(object):
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)
-        finally:
-            self.arvrunner.process_done(record["uuid"])