default=None,
metavar="CLUSTER_ID")
+ parser.add_argument("--collection-cache", type=int,
+ default=256*1024*1024,
+ help="Collection caches size.")
+
parser.add_argument("--name", type=str,
help="Name to use for workflow execution instance.",
default=None)
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
- self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+ self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
+ cap=arvargs.collection_cache)
self.fetcher_constructor = partial(CollectionFetcher,
api_client=self.api,
def start_run(self, runnable, runtimeContext):
- self.task_queue.add(partial(runnable.run, runtimeContext))
+ self.task_queue.add(partial(runnable.run, runtimeContext),
+ self.workflow_eval_lock, self.stop_polling)
def process_submitted(self, container):
with self.workflow_eval_lock:
with self.workflow_eval_lock:
j = self.processes[uuid]
logger.info("%s %s is %s", self.label(j), uuid, record["state"])
- self.task_queue.add(partial(j.done, record))
+ self.task_queue.add(partial(j.done, record),
+ self.workflow_eval_lock, self.stop_polling)
del self.processes[uuid]
def runtime_status_update(self, kind, message, detail=None):
else:
logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
break
+
+ if self.stop_polling.is_set():
+ break
+
loopperf.__enter__()
loopperf.__exit__()
class TaskQueue(object):
def __init__(self, lock, thread_count):
self.thread_count = thread_count
- self.task_queue = Queue.Queue()
+ self.task_queue = Queue.Queue(maxsize=self.thread_count)
self.task_queue_threads = []
self.lock = lock
self.in_flight = 0
with self.lock:
self.in_flight -= 1
- def add(self, task):
+ def add(self, task, unlock, check_done):
with self.lock:
if self.thread_count > 1:
self.in_flight += 1
- self.task_queue.put(task)
else:
task()
+ return
+
+ while True:
+ try:
+ unlock.release()
+ self.task_queue.put(task, block=True, timeout=3)
+ return
+ except Queue.Full:
+ if check_done.is_set():
+ return
+ finally:
+ unlock.acquire()
+
def drain(self):
try: