14510: Perfomance fixes
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Tue, 20 Nov 2018 19:33:53 +0000 (14:33 -0500)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Tue, 27 Nov 2018 21:01:38 +0000 (16:01 -0500)
* Add --collection-cache to enable users to workaround cache thrashing

* Limit task queue size.  Release workflow lock when attempting to
  enqueue a task (which now may block).

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/task_queue.py

index 9b814f534c11af95aa59b77cdb3eaaa737866195..ce22219d7a71ea9d5434f50023d0394ba964cc57 100644 (file)
@@ -159,6 +159,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                          default=None,
                          metavar="CLUSTER_ID")
 
+    parser.add_argument("--collection-cache", type=int,
+                        default=256*1024*1024,
+                        help="Collection caches size.")
+
     parser.add_argument("--name", type=str,
                         help="Name to use for workflow execution instance.",
                         default=None)
index 6cac709260fcef6f464d5d7e54706305e883a685..ff8ff6ff89fc76bc40359194025f7b4c5c31fec1 100644 (file)
@@ -122,7 +122,8 @@ class ArvCwlExecutor(object):
         else:
             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
-        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
+                                                cap=arvargs.collection_cache)
 
         self.fetcher_constructor = partial(CollectionFetcher,
                                            api_client=self.api,
@@ -206,7 +207,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
 
     def start_run(self, runnable, runtimeContext):
-        self.task_queue.add(partial(runnable.run, runtimeContext))
+        self.task_queue.add(partial(runnable.run, runtimeContext),
+                            self.workflow_eval_lock, self.stop_polling)
 
     def process_submitted(self, container):
         with self.workflow_eval_lock:
@@ -216,7 +218,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         with self.workflow_eval_lock:
             j = self.processes[uuid]
             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
-            self.task_queue.add(partial(j.done, record))
+            self.task_queue.add(partial(j.done, record),
+                                self.workflow_eval_lock, self.stop_polling)
             del self.processes[uuid]
 
     def runtime_status_update(self, kind, message, detail=None):
@@ -676,6 +679,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                     else:
                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
                         break
+
+                if self.stop_polling.is_set():
+                    break
+
                 loopperf.__enter__()
             loopperf.__exit__()
 
index b9fd09807b452c1b06738ef1a7df72fd9dcc8708..018172b591b761b90825c9160b366b28a35d6b94 100644 (file)
@@ -11,7 +11,7 @@ logger = logging.getLogger('arvados.cwl-runner')
 class TaskQueue(object):
     def __init__(self, lock, thread_count):
         self.thread_count = thread_count
-        self.task_queue = Queue.Queue()
+        self.task_queue = Queue.Queue(maxsize=self.thread_count)
         self.task_queue_threads = []
         self.lock = lock
         self.in_flight = 0
@@ -37,13 +37,25 @@ class TaskQueue(object):
                 with self.lock:
                     self.in_flight -= 1
 
-    def add(self, task):
+    def add(self, task, unlock, check_done):
         with self.lock:
             if self.thread_count > 1:
                 self.in_flight += 1
-                self.task_queue.put(task)
             else:
                 task()
+                return
+
+        while True:
+            try:
+                unlock.release()
+                self.task_queue.put(task, block=True, timeout=3)
+                return
+            except Queue.Full:
+                if check_done.is_set():
+                    return
+            finally:
+                unlock.acquire()
+
 
     def drain(self):
         try: