From 69c8df415d721461135331a50e98255a625b12d1 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 5 Apr 2018 16:32:19 -0400 Subject: [PATCH] 13108: Simplify locking, add methods for recording process status Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/__init__.py | 58 +++++++++++++++++------------ sdk/cwl/arvados_cwl/arvcontainer.py | 10 ++--- sdk/cwl/arvados_cwl/arvjob.py | 9 +++-- 3 files changed, 44 insertions(+), 33 deletions(-) diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index c0dd9d38f0..fbef5347e6 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -65,8 +65,8 @@ class ArvCwlRunner(object): def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4): self.api = api_client self.processes = {} - self.lock = threading.Lock() - self.cond = threading.Condition(self.lock) + self.in_flight = 0 + self.workflow_eval_lock = threading.Condition(threading.RLock()) self.final_output = None self.final_status = None self.uploaded = {} @@ -136,27 +136,39 @@ class ArvCwlRunner(object): self.final_status = processStatus self.final_output = out + def start_run(self, runnable, kwargs): + with self.workflow_eval_lock: + self.in_flight += 1 + runnable.run(**kwargs) + + def process_submitted(self, container): + with self.workflow_eval_lock: + self.processes[container.uuid] = container + self.in_flight -= 1 + + def process_done(self, uuid): + with self.workflow_eval_lock: + if uuid in self.processes: + del self.processes[uuid] + def on_message(self, event): if "object_uuid" in event: if event["object_uuid"] in self.processes and event["event_type"] == "update": if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False: uuid = event["object_uuid"] - with self.lock: + with self.workflow_eval_lock: j = self.processes[uuid] logger.info("%s %s is Running", self.label(j), uuid) j.running = True j.update_pipeline_component(event["properties"]["new_attributes"]) elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"): uuid = event["object_uuid"] - try: - self.cond.acquire() + with self.workflow_eval_lock: j = self.processes[uuid] logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"]) with Perf(metrics, "done %s" % j.name): j.done(event["properties"]["new_attributes"]) - self.cond.notify() - finally: - self.cond.release() + self.workflow_eval_lock.notify() def label(self, obj): return "[%s %s]" % (self.work_api[0:-1], obj.name) @@ -172,8 +184,8 @@ class ArvCwlRunner(object): self.stop_polling.wait(15) if self.stop_polling.is_set(): break - with self.lock: - keys = self.processes.keys() + with self.workflow_eval_lock: + keys = list(self.processes.keys()) if not keys: continue @@ -198,10 +210,9 @@ class ArvCwlRunner(object): }) except: logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False)) - self.cond.acquire() - self.processes.clear() - self.cond.notify() - self.cond.release() + with workflow_eval_lock: + self.processes.clear() + self.workflow_eval_lock.notify() finally: self.stop_polling.set() @@ -495,10 +506,11 @@ class ArvCwlRunner(object): **kwargs) try: - self.cond.acquire() - # Will continue to hold the lock for the duration of this code - # except when in cond.wait(), at which point on_message can update - # job state and process output callbacks. + self.workflow_eval_lock.acquire() + # Holds the lock while this code runs and releases it when + # it is safe to do so in self.workflow_eval_lock.wait(), + # at which point on_message can update job state and + # process output callbacks. loopperf = Perf(metrics, "jobiter") loopperf.__enter__() @@ -510,10 +522,10 @@ class ArvCwlRunner(object): if runnable: with Perf(metrics, "run"): - runnable.run(**kwargs) + self.start_run(runnable, kwargs) else: - if self.processes: - self.cond.wait(1) + if (self.in_flight + len(self.processes)) > 0: + self.workflow_eval_lock.wait(1) else: logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.") break @@ -521,7 +533,7 @@ class ArvCwlRunner(object): loopperf.__exit__() while self.processes: - self.cond.wait(1) + self.workflow_eval_lock.wait(1) except UnsupportedRequirement: raise @@ -537,7 +549,7 @@ class ArvCwlRunner(object): self.api.container_requests().update(uuid=runnerjob.uuid, body={"priority": "0"}).execute(num_retries=self.num_retries) finally: - self.cond.release() + self.workflow_eval_lock.release() self.stop_polling.set() self.polling_thread.join() diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 5c11babfc6..ab6dddb45c 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -254,7 +254,7 @@ class ArvadosContainer(object): ).execute(num_retries=self.arvrunner.num_retries) self.uuid = response["uuid"] - self.arvrunner.processes[self.uuid] = self + self.arvrunner.process_submitted(self) if response["state"] == "Final": logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"]) @@ -315,8 +315,7 @@ class ArvadosContainer(object): processStatus = "permanentFail" finally: self.output_callback(outputs, processStatus) - if record["uuid"] in self.arvrunner.processes: - del self.arvrunner.processes[record["uuid"]] + self.arvrunner.process_done(record["uuid"]) class RunnerContainer(Runner): @@ -446,7 +445,7 @@ class RunnerContainer(Runner): ).execute(num_retries=self.arvrunner.num_retries) self.uuid = response["uuid"] - self.arvrunner.processes[self.uuid] = self + self.arvrunner.process_submitted(self) logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"]) @@ -464,5 +463,4 @@ class RunnerContainer(Runner): else: super(RunnerContainer, self).done(container) finally: - if record["uuid"] in self.arvrunner.processes: - del self.arvrunner.processes[record["uuid"]] + self.arvrunner.process_done(record["uuid"]) diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py index 88155b5b95..0c35115f93 100644 --- a/sdk/cwl/arvados_cwl/arvjob.py +++ b/sdk/cwl/arvados_cwl/arvjob.py @@ -150,7 +150,8 @@ class ArvadosJob(object): find_or_create=enable_reuse ).execute(num_retries=self.arvrunner.num_retries) - self.arvrunner.processes[response["uuid"]] = self + self.uuid = response["uuid"] + self.arvrunner.process_submitted(self) self.update_pipeline_component(response) @@ -263,8 +264,8 @@ class ArvadosJob(object): processStatus = "permanentFail" finally: self.output_callback(outputs, processStatus) - if record["uuid"] in self.arvrunner.processes: - del self.arvrunner.processes[record["uuid"]] + self.arvrunner.process_done(record["uuid"]) + class RunnerJob(Runner): """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner.""" @@ -351,7 +352,7 @@ class RunnerJob(Runner): return self.uuid = job["uuid"] - self.arvrunner.processes[self.uuid] = self + self.arvrunner.process_submitted(self) if job["state"] in ("Complete", "Failed", "Cancelled"): self.done(job) -- 2.30.2