13108: Fix tracking tasks in flight
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 6 Apr 2018 15:21:56 +0000 (11:21 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 6 Apr 2018 15:21:56 +0000 (11:21 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/cwl/arvados_cwl/__init__.py

index 0f916ee485e8049577ad65b89fc3fba1f1766ec9..16f1bf473a34a09530636b2ab7f957a3add36672 100644 (file)
@@ -144,6 +144,7 @@ class ArvCwlRunner(object):
                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
             self.final_status = processStatus
             self.final_output = out
+            self.workflow_eval_lock.notifyAll()
 
     def task_queue_func(self):
         while True:
@@ -151,22 +152,23 @@ class ArvCwlRunner(object):
             if task is None:
                 return
             task()
+            with self.workflow_eval_lock:
+                self.in_flight -= 1
 
     def task_queue_add(self, task):
-        if self.thread_count > 1:
-            self.task_queue.put(task)
-        else:
-            task()
+        with self.workflow_eval_lock:
+            if self.thread_count > 1:
+                self.in_flight += 1
+                self.task_queue.put(task)
+            else:
+                task()
 
     def start_run(self, runnable, kwargs):
-        with self.workflow_eval_lock:
-            self.in_flight += 1
         self.task_queue_add(partial(runnable.run, **kwargs))
 
     def process_submitted(self, container):
         with self.workflow_eval_lock:
             self.processes[container.uuid] = container
-            self.in_flight -= 1
 
     def process_done(self, uuid):
         with self.workflow_eval_lock:
@@ -176,6 +178,7 @@ class ArvCwlRunner(object):
     def wrapped_callback(self, cb, obj, st):
         with self.workflow_eval_lock:
             cb(obj, st)
+            self.workflow_eval_lock.notifyAll()
 
     def get_wrapped_callback(self, cb):
         return partial(self.wrapped_callback, cb)
@@ -183,24 +186,19 @@ class ArvCwlRunner(object):
     def on_message(self, event):
         if "object_uuid" in event:
             if event["object_uuid"] in self.processes and event["event_type"] == "update":
-                if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
-                    uuid = event["object_uuid"]
+                uuid = event["object_uuid"]
+                if event["properties"]["new_attributes"]["state"] == "Running":
                     with self.workflow_eval_lock:
                         j = self.processes[uuid]
-                        logger.info("%s %s is Running", self.label(j), uuid)
-                        j.running = True
-                        j.update_pipeline_component(event["properties"]["new_attributes"])
+                        if j.running is False:
+                            j.running = True
+                            j.update_pipeline_component(event["properties"]["new_attributes"])
+                            logger.info("%s %s is Running", self.label(j), uuid)
                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
-                    uuid = event["object_uuid"]
                     with self.workflow_eval_lock:
                         j = self.processes[uuid]
-                        logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
-                    def done_cb():
-                        j.done(event["properties"]["new_attributes"])
-                        with self.workflow_eval_lock:
-                            self.workflow_eval_lock.notify()
-                    self.task_queue_add(done_cb)
-
+                    self.task_queue_add(partial(j.done, event["properties"]["new_attributes"]))
+                    logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
 
     def label(self, obj):
         return "[%s %s]" % (self.work_api[0:-1], obj.name)
@@ -251,7 +249,7 @@ class ArvCwlRunner(object):
             logger.exception("Fatal error in state polling thread.")
             with self.workflow_eval_lock:
                 self.processes.clear()
-                self.workflow_eval_lock.notify()
+                self.workflow_eval_lock.notifyAll()
         finally:
             self.stop_polling.set()
 
@@ -571,7 +569,7 @@ class ArvCwlRunner(object):
                     if (self.in_flight + len(self.processes)) > 0:
                         self.workflow_eval_lock.wait(3)
                     else:
-                        logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+                        logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pendingjobs.")
                         break
                 loopperf.__enter__()
             loopperf.__exit__()