import re
from functools import partial
import pkg_resources # part of setuptools
+import Queue
from cwltool.errors import WorkflowException
import cwltool.main
self.intermediate_output_ttl = 0
self.intermediate_output_collections = []
self.trash_intermediate = False
+ self.runnable_queue = Queue.Queue()
if keep_client is not None:
self.keep_client = keep_client
return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
def output_callback(self, out, processStatus):
- if processStatus == "success":
- logger.info("Overall process status is %s", processStatus)
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Complete"}).execute(num_retries=self.num_retries)
- else:
- logger.warn("Overall process status is %s", processStatus)
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Failed"}).execute(num_retries=self.num_retries)
- self.final_status = processStatus
- self.final_output = out
+ with self.workflow_eval_lock:
+ if processStatus == "success":
+ logger.info("Overall process status is %s", processStatus)
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Complete"}).execute(num_retries=self.num_retries)
+ else:
+ logger.warn("Overall process status is %s", processStatus)
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ self.final_status = processStatus
+ self.final_output = out
+
+ def runnable_queue_thread(self):
+ while True:
+ runnable, kwargs = self.runnable_queue.get()
+ runnable.run(**kwargs)
def start_run(self, runnable, kwargs):
with self.workflow_eval_lock:
self.in_flight += 1
- runnable.run(**kwargs)
+ self.runnable_queue.put((runnable, kwargs))
def process_submitted(self, container):
with self.workflow_eval_lock:
if uuid in self.processes:
del self.processes[uuid]
+ def wrapped_callback(self, cb, obj, st):
+ with self.workflow_eval_lock:
+ cb(obj, st)
+
+ def get_wrapped_callback(self, cb):
+ return partial(self.wrapped_callback, cb)
+
def on_message(self, event):
if "object_uuid" in event:
if event["object_uuid"] in self.processes and event["event_type"] == "update":
self.polling_thread = threading.Thread(target=self.poll_states)
self.polling_thread.start()
+ threading.Thread(target=self.runnable_queue_thread).start()
+
if runnerjob:
jobiter = iter((runnerjob,))
else: