projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
14510: Perfomance fixes
[arvados.git]
/
sdk
/
cwl
/
arvados_cwl
/
executor.py
diff --git
a/sdk/cwl/arvados_cwl/executor.py
b/sdk/cwl/arvados_cwl/executor.py
index 6cac709260fcef6f464d5d7e54706305e883a685..ff8ff6ff89fc76bc40359194025f7b4c5c31fec1 100644
(file)
--- a/
sdk/cwl/arvados_cwl/executor.py
+++ b/
sdk/cwl/arvados_cwl/executor.py
@@
-122,7
+122,8
@@
class ArvCwlExecutor(object):
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
- self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+ self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
+ cap=arvargs.collection_cache)
self.fetcher_constructor = partial(CollectionFetcher,
api_client=self.api,
self.fetcher_constructor = partial(CollectionFetcher,
api_client=self.api,
@@
-206,7
+207,8
@@
http://doc.arvados.org/install/install-api-server.html#disable_api_methods
def start_run(self, runnable, runtimeContext):
def start_run(self, runnable, runtimeContext):
- self.task_queue.add(partial(runnable.run, runtimeContext))
+ self.task_queue.add(partial(runnable.run, runtimeContext),
+ self.workflow_eval_lock, self.stop_polling)
def process_submitted(self, container):
with self.workflow_eval_lock:
def process_submitted(self, container):
with self.workflow_eval_lock:
@@
-216,7
+218,8
@@
http://doc.arvados.org/install/install-api-server.html#disable_api_methods
with self.workflow_eval_lock:
j = self.processes[uuid]
logger.info("%s %s is %s", self.label(j), uuid, record["state"])
with self.workflow_eval_lock:
j = self.processes[uuid]
logger.info("%s %s is %s", self.label(j), uuid, record["state"])
- self.task_queue.add(partial(j.done, record))
+ self.task_queue.add(partial(j.done, record),
+ self.workflow_eval_lock, self.stop_polling)
del self.processes[uuid]
def runtime_status_update(self, kind, message, detail=None):
del self.processes[uuid]
def runtime_status_update(self, kind, message, detail=None):
@@
-676,6
+679,10
@@
http://doc.arvados.org/install/install-api-server.html#disable_api_methods
else:
logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
break
else:
logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
break
+
+ if self.stop_polling.is_set():
+ break
+
loopperf.__enter__()
loopperf.__exit__()
loopperf.__enter__()
loopperf.__exit__()