14510: Perfomance fixes
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
index 6cac709260fcef6f464d5d7e54706305e883a685..ff8ff6ff89fc76bc40359194025f7b4c5c31fec1 100644 (file)
@@ -122,7 +122,8 @@ class ArvCwlExecutor(object):
         else:
             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
-        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
+                                                cap=arvargs.collection_cache)
 
         self.fetcher_constructor = partial(CollectionFetcher,
                                            api_client=self.api,
@@ -206,7 +207,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
 
     def start_run(self, runnable, runtimeContext):
-        self.task_queue.add(partial(runnable.run, runtimeContext))
+        self.task_queue.add(partial(runnable.run, runtimeContext),
+                            self.workflow_eval_lock, self.stop_polling)
 
     def process_submitted(self, container):
         with self.workflow_eval_lock:
@@ -216,7 +218,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         with self.workflow_eval_lock:
             j = self.processes[uuid]
             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
-            self.task_queue.add(partial(j.done, record))
+            self.task_queue.add(partial(j.done, record),
+                                self.workflow_eval_lock, self.stop_polling)
             del self.processes[uuid]
 
     def runtime_status_update(self, kind, message, detail=None):
@@ -676,6 +679,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                     else:
                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
                         break
+
+                if self.stop_polling.is_set():
+                    break
+
                 loopperf.__enter__()
             loopperf.__exit__()