From 00bb1461d14cfc02e6ec2c74d622b7b6b716e775 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 20 Nov 2018 14:33:53 -0500 Subject: [PATCH] 14510: Perfomance fixes * Add --collection-cache to enable users to workaround cache thrashing * Limit task queue size. Release workflow lock when attempting to enqueue a task (which now may block). Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/__init__.py | 4 ++++ sdk/cwl/arvados_cwl/executor.py | 13 ++++++++++--- sdk/cwl/arvados_cwl/task_queue.py | 18 +++++++++++++++--- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 9b814f534c..ce22219d7a 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -159,6 +159,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser default=None, metavar="CLUSTER_ID") + parser.add_argument("--collection-cache", type=int, + default=256*1024*1024, + help="Collection caches size.") + parser.add_argument("--name", type=str, help="Name to use for workflow execution instance.", default=None) diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index 6cac709260..ff8ff6ff89 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -122,7 +122,8 @@ class ArvCwlExecutor(object): else: self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries) - self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries) + self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries, + cap=arvargs.collection_cache) self.fetcher_constructor = partial(CollectionFetcher, api_client=self.api, @@ -206,7 +207,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods def start_run(self, runnable, runtimeContext): - self.task_queue.add(partial(runnable.run, runtimeContext)) + self.task_queue.add(partial(runnable.run, runtimeContext), + self.workflow_eval_lock, self.stop_polling) def process_submitted(self, container): with self.workflow_eval_lock: @@ -216,7 +218,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods with self.workflow_eval_lock: j = self.processes[uuid] logger.info("%s %s is %s", self.label(j), uuid, record["state"]) - self.task_queue.add(partial(j.done, record)) + self.task_queue.add(partial(j.done, record), + self.workflow_eval_lock, self.stop_polling) del self.processes[uuid] def runtime_status_update(self, kind, message, detail=None): @@ -676,6 +679,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods else: logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.") break + + if self.stop_polling.is_set(): + break + loopperf.__enter__() loopperf.__exit__() diff --git a/sdk/cwl/arvados_cwl/task_queue.py b/sdk/cwl/arvados_cwl/task_queue.py index b9fd09807b..018172b591 100644 --- a/sdk/cwl/arvados_cwl/task_queue.py +++ b/sdk/cwl/arvados_cwl/task_queue.py @@ -11,7 +11,7 @@ logger = logging.getLogger('arvados.cwl-runner') class TaskQueue(object): def __init__(self, lock, thread_count): self.thread_count = thread_count - self.task_queue = Queue.Queue() + self.task_queue = Queue.Queue(maxsize=self.thread_count) self.task_queue_threads = [] self.lock = lock self.in_flight = 0 @@ -37,13 +37,25 @@ class TaskQueue(object): with self.lock: self.in_flight -= 1 - def add(self, task): + def add(self, task, unlock, check_done): with self.lock: if self.thread_count > 1: self.in_flight += 1 - self.task_queue.put(task) else: task() + return + + while True: + try: + unlock.release() + self.task_queue.put(task, block=True, timeout=3) + return + except Queue.Full: + if check_done.is_set(): + return + finally: + unlock.acquire() + def drain(self): try: -- 2.39.5