from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .perf import Perf
from .pathmapper import NoFollowPathMapper
+from .task_queue import TaskQueue
from ._version import __version__
from cwltool.pack import pack
"""
def __init__(self, api_client, work_api=None, keep_client=None,
- output_name=None, output_tags=None, num_retries=4,
- thread_count=4):
+ output_name=None, output_tags=None, num_retries=4):
self.api = api_client
self.processes = {}
- self.in_flight = 0
self.workflow_eval_lock = threading.Condition(threading.RLock())
self.final_output = None
self.final_status = None
self.intermediate_output_ttl = 0
self.intermediate_output_collections = []
self.trash_intermediate = False
- self.task_queue = Queue.Queue()
- self.task_queue_threads = []
- self.thread_count = thread_count
+ self.thread_count = 4
self.poll_interval = 12
if keep_client is not None:
self.final_output = out
self.workflow_eval_lock.notifyAll()
- def task_queue_func(self):
- while True:
- task = self.task_queue.get()
- if task is None:
- return
- task()
- with self.workflow_eval_lock:
- self.in_flight -= 1
-
- def task_queue_add(self, task):
- with self.workflow_eval_lock:
- if self.thread_count > 1:
- self.in_flight += 1
- self.task_queue.put(task)
- else:
- task()
def start_run(self, runnable, kwargs):
- self.task_queue_add(partial(runnable.run, **kwargs))
+ self.task_queue.add(partial(runnable.run, **kwargs))
def process_submitted(self, container):
with self.workflow_eval_lock:
elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
with self.workflow_eval_lock:
j = self.processes[uuid]
- self.task_queue_add(partial(j.done, event["properties"]["new_attributes"]))
+ self.task_queue.add(partial(j.done, event["properties"]["new_attributes"]))
logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
def label(self, obj):
collection_cache=self.collection_cache)
self.fs_access = make_fs_access(kwargs["basedir"])
self.secret_store = kwargs.get("secret_store")
+ self.thread_count = kwargs.get("thread_count", 4)
self.trash_intermediate = kwargs["trash_intermediate"]
if self.trash_intermediate and self.work_api != "containers":
self.polling_thread = threading.Thread(target=self.poll_states)
self.polling_thread.start()
- for r in xrange(0, self.thread_count):
- t = threading.Thread(target=self.task_queue_func)
- self.task_queue_threads.append(t)
- t.start()
+ self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
if runnerjob:
jobiter = iter((runnerjob,))
if self.stop_polling.is_set():
break
+ if self.task_queue.error is not None:
+ raise self.task_queue.error
+
if runnable:
with Perf(metrics, "run"):
self.start_run(runnable, kwargs)
else:
- if (self.in_flight + len(self.processes)) > 0:
+ if (self.task_queue.in_flight + len(self.processes)) > 0:
self.workflow_eval_lock.wait(3)
else:
logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pendingjobs.")
loopperf.__enter__()
loopperf.__exit__()
- while (self.in_flight + len(self.processes)) > 0:
+ while (self.task_queue.in_flight + len(self.processes)) > 0:
+ if self.task_queue.error is not None:
+ raise self.task_queue.error
self.workflow_eval_lock.wait(3)
except UnsupportedRequirement:
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
self.workflow_eval_lock.release()
- try:
- # Drain queue
- while not self.task_queue.empty():
- self.task_queue.get()
- except Queue.Empty:
- pass
+ self.task_queue.drain()
self.stop_polling.set()
self.polling_thread.join()
- for t in self.task_queue_threads:
- self.task_queue.put(None)
- for t in self.task_queue_threads:
- t.join()
+ self.task_queue.join()
if self.final_status == "UnsupportedRequirement":
raise UnsupportedRequirement("Check log for details.")
--- /dev/null
+import Queue
+import threading
+import logging
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+class TaskQueue(object):
+ def __init__(self, lock, thread_count):
+ self.thread_count = thread_count
+ self.task_queue = Queue.Queue()
+ self.task_queue_threads = []
+ self.lock = lock
+ self.in_flight = 0
+ self.error = None
+
+ for r in xrange(0, self.thread_count):
+ t = threading.Thread(target=self.task_queue_func)
+ self.task_queue_threads.append(t)
+ t.start()
+
+ def task_queue_func(self):
+
+ while True:
+ task = self.task_queue.get()
+ if task is None:
+ return
+ try:
+ task()
+ except Exception as e:
+ logger.exception("Unexpected error running task")
+ self.error = e
+
+ with self.lock:
+ self.in_flight -= 1
+
+ def add(self, task):
+ with self.lock:
+ if self.thread_count > 1:
+ self.in_flight += 1
+ self.task_queue.put(task)
+ else:
+ task()
+
+ def drain(self):
+ try:
+ # Drain queue
+ while not self.task_queue.empty():
+ self.task_queue.get()
+ except Queue.Empty:
+ pass
+
+ def join(self):
+ for t in self.task_queue_threads:
+ self.task_queue.put(None)
+ for t in self.task_queue_threads:
+ t.join()