from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .perf import Perf
from .pathmapper import NoFollowPathMapper
+from .task_queue import TaskQueue
from ._version import __version__
from cwltool.pack import pack
"""
def __init__(self, api_client, work_api=None, keep_client=None,
- output_name=None, output_tags=None, num_retries=4,
- parallel_submit_count=4):
+ output_name=None, output_tags=None, num_retries=4):
self.api = api_client
self.processes = {}
- self.in_flight = 0
self.workflow_eval_lock = threading.Condition(threading.RLock())
self.final_output = None
self.final_status = None
self.intermediate_output_ttl = 0
self.intermediate_output_collections = []
self.trash_intermediate = False
- self.task_queue = Queue.Queue()
- self.task_queue_threads = []
- self.parallel_submit_count = parallel_submit_count
+ self.thread_count = 4
self.poll_interval = 12
if keep_client is not None:
body={"state": "Failed"}).execute(num_retries=self.num_retries)
self.final_status = processStatus
self.final_output = out
+ self.workflow_eval_lock.notifyAll()
- def task_queue_func(self):
- while True:
- task = self.task_queue.get()
- if task is None:
- return
- task()
-
- def task_queue_add(self, task):
- if self.parallel_submit_count > 1:
- self.task_queue.put(task)
- else:
- task()
def start_run(self, runnable, kwargs):
- with self.workflow_eval_lock:
- self.in_flight += 1
- self.task_queue_add(partial(runnable.run, **kwargs))
+ self.task_queue.add(partial(runnable.run, **kwargs))
def process_submitted(self, container):
with self.workflow_eval_lock:
self.processes[container.uuid] = container
- self.in_flight -= 1
def process_done(self, uuid):
with self.workflow_eval_lock:
def wrapped_callback(self, cb, obj, st):
with self.workflow_eval_lock:
cb(obj, st)
+ self.workflow_eval_lock.notifyAll()
def get_wrapped_callback(self, cb):
return partial(self.wrapped_callback, cb)
def on_message(self, event):
if "object_uuid" in event:
if event["object_uuid"] in self.processes and event["event_type"] == "update":
- if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
- uuid = event["object_uuid"]
+ uuid = event["object_uuid"]
+ if event["properties"]["new_attributes"]["state"] == "Running":
with self.workflow_eval_lock:
j = self.processes[uuid]
- logger.info("%s %s is Running", self.label(j), uuid)
- j.running = True
- j.update_pipeline_component(event["properties"]["new_attributes"])
+ if j.running is False:
+ j.running = True
+ j.update_pipeline_component(event["properties"]["new_attributes"])
+ logger.info("%s %s is Running", self.label(j), uuid)
elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
- uuid = event["object_uuid"]
with self.workflow_eval_lock:
j = self.processes[uuid]
- logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
- def done_cb():
- j.done(event["properties"]["new_attributes"])
- with self.workflow_eval_lock:
- self.workflow_eval_lock.notify()
- self.task_queue_add(done_cb)
-
+ self.task_queue.add(partial(j.done, event["properties"]["new_attributes"]))
+ logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
def label(self, obj):
return "[%s %s]" % (self.work_api[0:-1], obj.name)
logger.exception("Fatal error in state polling thread.")
with self.workflow_eval_lock:
self.processes.clear()
- self.workflow_eval_lock.notify()
+ self.workflow_eval_lock.notifyAll()
finally:
self.stop_polling.set()
collection_cache=self.collection_cache)
self.fs_access = make_fs_access(kwargs["basedir"])
self.secret_store = kwargs.get("secret_store")
+ self.thread_count = kwargs.get("thread_count", 4)
self.trash_intermediate = kwargs["trash_intermediate"]
if self.trash_intermediate and self.work_api != "containers":
logger.info("Pipeline instance %s", self.pipeline["uuid"])
if runnerjob and not kwargs.get("wait"):
- runnerjob.run(wait=kwargs.get("wait"))
+ runnerjob.run(**kwargs)
return (runnerjob.uuid, "success")
self.poll_api = arvados.api('v1')
self.polling_thread = threading.Thread(target=self.poll_states)
self.polling_thread.start()
- for r in xrange(0, self.parallel_submit_count):
- t = threading.Thread(target=self.task_queue_func)
- self.task_queue_threads.append(t)
- t.start()
+ self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
if runnerjob:
jobiter = iter((runnerjob,))
if self.stop_polling.is_set():
break
+ if self.task_queue.error is not None:
+ raise self.task_queue.error
+
if runnable:
with Perf(metrics, "run"):
self.start_run(runnable, kwargs)
else:
- if (self.in_flight + len(self.processes)) > 0:
+ if (self.task_queue.in_flight + len(self.processes)) > 0:
self.workflow_eval_lock.wait(3)
else:
- logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+ logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pendingjobs.")
break
loopperf.__enter__()
loopperf.__exit__()
- while (self.in_flight + len(self.processes)) > 0:
+ while (self.task_queue.in_flight + len(self.processes)) > 0:
+ if self.task_queue.error is not None:
+ raise self.task_queue.error
self.workflow_eval_lock.wait(3)
except UnsupportedRequirement:
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
self.workflow_eval_lock.release()
- try:
- # Drain queue
- while not self.task_queue.empty():
- self.task_queue.get()
- except Queue.Empty:
- pass
+ self.task_queue.drain()
self.stop_polling.set()
self.polling_thread.join()
- for t in self.task_queue_threads:
- self.task_queue.put(None)
- for t in self.task_queue_threads:
- t.join()
+ self.task_queue.join()
if self.final_status == "UnsupportedRequirement":
raise UnsupportedRequirement("Check log for details.")
action="store_true", default=False,
help=argparse.SUPPRESS)
- parser.add_argument("--parallel-submit-count", type=int,
- default=4, help="Submit requests in parallel (default 4)")
+ parser.add_argument("--thread-count", type=int,
+ default=4, help="Number of threads to use for job submit and output collection.")
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--trash-intermediate", action="store_true",