self.intermediate_output_ttl = 0
self.intermediate_output_collections = []
self.trash_intermediate = False
- self.runnable_queue = Queue.Queue()
- self.runnable_queue_threads = []
+ self.task_queue = Queue.Queue()
+ self.task_queue_threads = []
self.parallel_submit_count = parallel_submit_count
self.poll_interval = 12
self.final_status = processStatus
self.final_output = out
- def runnable_queue_func(self):
+ def task_queue_func(self):
while True:
- task = self.runnable_queue.get()
+ task = self.task_queue.get()
if task is None:
return
task()
+ def task_queue_add(self, task):
+ if self.parallel_submit_count > 1:
+ self.task_queue.put(task)
+ else:
+ task()
+
def start_run(self, runnable, kwargs):
with self.workflow_eval_lock:
self.in_flight += 1
- self.runnable_queue.put(partial(runnable.run, **kwargs))
+ self.task_queue_add(partial(runnable.run, **kwargs))
def process_submitted(self, container):
with self.workflow_eval_lock:
j.done(event["properties"]["new_attributes"])
with self.workflow_eval_lock:
self.workflow_eval_lock.notify()
- self.runnable_queue.put(done_cb)
+ self.task_queue_add(done_cb)
def label(self, obj):
self.polling_thread.start()
for r in xrange(0, self.parallel_submit_count):
- t = threading.Thread(target=self.runnable_queue_func)
- self.runnable_queue_threads.append(t)
+ t = threading.Thread(target=self.task_queue_func)
+ self.task_queue_threads.append(t)
t.start()
if runnerjob:
loopperf.__enter__()
loopperf.__exit__()
- while self.processes:
+ while (self.in_flight + len(self.processes)) > 0:
self.workflow_eval_lock.wait(3)
except UnsupportedRequirement:
self.workflow_eval_lock.release()
try:
# Drain queue
- while not self.runnable_queue.empty():
- self.runnable_queue.get()
+ while not self.task_queue.empty():
+ self.task_queue.get()
except Queue.Empty:
pass
self.stop_polling.set()
self.polling_thread.join()
- for t in self.runnable_queue_threads:
- self.runnable_queue.put(None)
- for t in self.runnable_queue_threads:
+ for t in self.task_queue_threads:
+ self.task_queue.put(None)
+ for t in self.task_queue_threads:
t.join()
if self.final_status == "UnsupportedRequirement":