13108: Rename runnable_queue -> task_queue
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 6 Apr 2018 12:44:23 +0000 (08:44 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 6 Apr 2018 12:49:50 +0000 (08:49 -0400)
Execute synchronously when parallel_submit_count == 1

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/cwl/arvados_cwl/__init__.py

index c0e919c6ce9c52fc0f6579f00d211f221e569757..f70fa65fb9d8ea6018e726cfe3ee9489f53d83a2 100644 (file)
@@ -86,8 +86,8 @@ class ArvCwlRunner(object):
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
-        self.runnable_queue = Queue.Queue()
-        self.runnable_queue_threads = []
+        self.task_queue = Queue.Queue()
+        self.task_queue_threads = []
         self.parallel_submit_count = parallel_submit_count
         self.poll_interval = 12
 
@@ -145,17 +145,23 @@ class ArvCwlRunner(object):
             self.final_status = processStatus
             self.final_output = out
 
-    def runnable_queue_func(self):
+    def task_queue_func(self):
         while True:
-            task = self.runnable_queue.get()
+            task = self.task_queue.get()
             if task is None:
                 return
             task()
 
+    def task_queue_add(self, task):
+        if self.parallel_submit_count > 1:
+            self.task_queue.put(task)
+        else:
+            task()
+
     def start_run(self, runnable, kwargs):
         with self.workflow_eval_lock:
             self.in_flight += 1
-        self.runnable_queue.put(partial(runnable.run, **kwargs))
+        self.task_queue_add(partial(runnable.run, **kwargs))
 
     def process_submitted(self, container):
         with self.workflow_eval_lock:
@@ -193,7 +199,7 @@ class ArvCwlRunner(object):
                         j.done(event["properties"]["new_attributes"])
                         with self.workflow_eval_lock:
                             self.workflow_eval_lock.notify()
-                    self.runnable_queue.put(done_cb)
+                    self.task_queue_add(done_cb)
 
 
     def label(self, obj):
@@ -530,8 +536,8 @@ class ArvCwlRunner(object):
         self.polling_thread.start()
 
         for r in xrange(0, self.parallel_submit_count):
-            t = threading.Thread(target=self.runnable_queue_func)
-            self.runnable_queue_threads.append(t)
+            t = threading.Thread(target=self.task_queue_func)
+            self.task_queue_threads.append(t)
             t.start()
 
         if runnerjob:
@@ -570,7 +576,7 @@ class ArvCwlRunner(object):
                 loopperf.__enter__()
             loopperf.__exit__()
 
-            while self.processes:
+            while (self.in_flight + len(self.processes)) > 0:
                 self.workflow_eval_lock.wait(3)
 
         except UnsupportedRequirement:
@@ -590,15 +596,15 @@ class ArvCwlRunner(object):
             self.workflow_eval_lock.release()
             try:
                 # Drain queue
-                while not self.runnable_queue.empty():
-                    self.runnable_queue.get()
+                while not self.task_queue.empty():
+                    self.task_queue.get()
             except Queue.Empty:
                 pass
             self.stop_polling.set()
             self.polling_thread.join()
-            for t in self.runnable_queue_threads:
-                self.runnable_queue.put(None)
-            for t in self.runnable_queue_threads:
+            for t in self.task_queue_threads:
+                self.task_queue.put(None)
+            for t in self.task_queue_threads:
                 t.join()
 
         if self.final_status == "UnsupportedRequirement":