X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/69c8df415d721461135331a50e98255a625b12d1..8c81e6c09228a9d7a3e8036624c60367615ddfc6:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index fbef5347e6..c0e919c6ce 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -17,6 +17,8 @@ import json import re from functools import partial import pkg_resources # part of setuptools +import Queue +import time from cwltool.errors import WorkflowException import cwltool.main @@ -62,7 +64,9 @@ class ArvCwlRunner(object): """ - def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4): + def __init__(self, api_client, work_api=None, keep_client=None, + output_name=None, output_tags=None, num_retries=4, + parallel_submit_count=4): self.api = api_client self.processes = {} self.in_flight = 0 @@ -82,6 +86,10 @@ class ArvCwlRunner(object): self.intermediate_output_ttl = 0 self.intermediate_output_collections = [] self.trash_intermediate = False + self.runnable_queue = Queue.Queue() + self.runnable_queue_threads = [] + self.parallel_submit_count = parallel_submit_count + self.poll_interval = 12 if keep_client is not None: self.keep_client = keep_client @@ -123,23 +131,31 @@ class ArvCwlRunner(object): return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs) def output_callback(self, out, processStatus): - if processStatus == "success": - logger.info("Overall process status is %s", processStatus) - if self.pipeline: - self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], - body={"state": "Complete"}).execute(num_retries=self.num_retries) - else: - logger.warn("Overall process status is %s", processStatus) - if self.pipeline: - self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], - body={"state": "Failed"}).execute(num_retries=self.num_retries) - self.final_status = processStatus - self.final_output = out + with self.workflow_eval_lock: + if processStatus == "success": + logger.info("Overall process status is %s", processStatus) + if self.pipeline: + self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], + body={"state": "Complete"}).execute(num_retries=self.num_retries) + else: + logger.warn("Overall process status is %s", processStatus) + if self.pipeline: + self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], + body={"state": "Failed"}).execute(num_retries=self.num_retries) + self.final_status = processStatus + self.final_output = out + + def runnable_queue_func(self): + while True: + task = self.runnable_queue.get() + if task is None: + return + task() def start_run(self, runnable, kwargs): with self.workflow_eval_lock: self.in_flight += 1 - runnable.run(**kwargs) + self.runnable_queue.put(partial(runnable.run, **kwargs)) def process_submitted(self, container): with self.workflow_eval_lock: @@ -151,6 +167,13 @@ class ArvCwlRunner(object): if uuid in self.processes: del self.processes[uuid] + def wrapped_callback(self, cb, obj, st): + with self.workflow_eval_lock: + cb(obj, st) + + def get_wrapped_callback(self, cb): + return partial(self.wrapped_callback, cb) + def on_message(self, event): if "object_uuid" in event: if event["object_uuid"] in self.processes and event["event_type"] == "update": @@ -166,9 +189,12 @@ class ArvCwlRunner(object): with self.workflow_eval_lock: j = self.processes[uuid] logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"]) - with Perf(metrics, "done %s" % j.name): - j.done(event["properties"]["new_attributes"]) - self.workflow_eval_lock.notify() + def done_cb(): + j.done(event["properties"]["new_attributes"]) + with self.workflow_eval_lock: + self.workflow_eval_lock.notify() + self.runnable_queue.put(done_cb) + def label(self, obj): return "[%s %s]" % (self.work_api[0:-1], obj.name) @@ -180,15 +206,19 @@ class ArvCwlRunner(object): """ try: + remain_wait = self.poll_interval while True: - self.stop_polling.wait(15) + if remain_wait > 0: + self.stop_polling.wait(remain_wait) if self.stop_polling.is_set(): break with self.workflow_eval_lock: keys = list(self.processes.keys()) if not keys: + remain_wait = self.poll_interval continue + begin_poll = time.time() if self.work_api == "containers": table = self.poll_api.container_requests() elif self.work_api == "jobs": @@ -198,6 +228,7 @@ class ArvCwlRunner(object): proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries) except Exception as e: logger.warn("Error checking states on API server: %s", e) + remain_wait = self.poll_interval continue for p in proc_states["items"]: @@ -208,9 +239,11 @@ class ArvCwlRunner(object): "new_attributes": p } }) + finish_poll = time.time() + remain_wait = self.poll_interval - (finish_poll - begin_poll) except: - logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False)) - with workflow_eval_lock: + logger.exception("Fatal error in state polling thread.") + with self.workflow_eval_lock: self.processes.clear() self.workflow_eval_lock.notify() finally: @@ -496,6 +529,11 @@ class ArvCwlRunner(object): self.polling_thread = threading.Thread(target=self.poll_states) self.polling_thread.start() + for r in xrange(0, self.parallel_submit_count): + t = threading.Thread(target=self.runnable_queue_func) + self.runnable_queue_threads.append(t) + t.start() + if runnerjob: jobiter = iter((runnerjob,)) else: @@ -525,7 +563,7 @@ class ArvCwlRunner(object): self.start_run(runnable, kwargs) else: if (self.in_flight + len(self.processes)) > 0: - self.workflow_eval_lock.wait(1) + self.workflow_eval_lock.wait(3) else: logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.") break @@ -533,7 +571,7 @@ class ArvCwlRunner(object): loopperf.__exit__() while self.processes: - self.workflow_eval_lock.wait(1) + self.workflow_eval_lock.wait(3) except UnsupportedRequirement: raise @@ -550,8 +588,18 @@ class ArvCwlRunner(object): body={"priority": "0"}).execute(num_retries=self.num_retries) finally: self.workflow_eval_lock.release() + try: + # Drain queue + while not self.runnable_queue.empty(): + self.runnable_queue.get() + except Queue.Empty: + pass self.stop_polling.set() self.polling_thread.join() + for t in self.runnable_queue_threads: + self.runnable_queue.put(None) + for t in self.runnable_queue_threads: + t.join() if self.final_status == "UnsupportedRequirement": raise UnsupportedRequirement("Check log for details.") @@ -701,6 +749,9 @@ def arg_parser(): # type: () -> argparse.ArgumentParser action="store_true", default=False, help=argparse.SUPPRESS) + parser.add_argument("--parallel-submit-count", type=int, + default=4, help="Submit requests in parallel (default 4)") + exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--trash-intermediate", action="store_true", default=False, dest="trash_intermediate",