From e0fc29c78d959d19c3d63d3bfb204b1c444518bd Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 19 Aug 2016 10:50:25 -0400 Subject: [PATCH] 9820: Directly poll job or container records that we are interested in. Benefit: puts less load on database than log table polling, and doesn't miss events. --- sdk/cwl/arvados_cwl/__init__.py | 50 +++++++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 27af075f36..0d0d41611f 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -17,7 +17,6 @@ import cwltool.main import cwltool.workflow import arvados -import arvados.events import arvados.config from .arvcontainer import ArvadosContainer, RunnerContainer @@ -50,6 +49,8 @@ class ArvCwlRunner(object): self.num_retries = 4 self.uuid = None self.work_api = work_api + self.stop_polling = threading.Event() + self.poll_api = None if self.work_api is None: # todo: autodetect API to use. @@ -99,6 +100,41 @@ class ArvCwlRunner(object): finally: self.cond.release() + def poll_states(self): + """Poll status of jobs or containers listed in the processes dict. + + Runs in a separate thread. + """ + + while True: + self.stop_polling.wait(15) + if self.stop_polling.is_set(): + break + with self.lock: + keys = self.processes.keys() + if not keys: + continue + + if self.work_api == "containers": + table = self.poll_api.containers() + elif self.work_api == "jobs": + table = self.poll_api.jobs() + + try: + proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries) + except Exception as e: + logger.warn("Error checking states on API server: %s", e) + continue + + for p in proc_states["items"]: + self.on_message({ + "object_uuid": p["uuid"], + "event_type": "update", + "properties": { + "new_attributes": p + } + }) + def get_uploaded(self): return self.uploaded.copy() @@ -182,12 +218,9 @@ class ArvCwlRunner(object): runnerjob.run() return runnerjob.uuid - arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1" - - if self.work_api == "containers": - events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message) - if self.work_api == "jobs": - events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message) + self.poll_api = arvados.api('v1') + self.polling_thread = threading.Thread(target=self.poll_states) + self.polling_thread.start() if runnerjob: jobiter = iter((runnerjob,)) @@ -217,7 +250,6 @@ class ArvCwlRunner(object): while self.processes: self.cond.wait(1) - events.close() except UnsupportedRequirement: raise except: @@ -233,6 +265,8 @@ class ArvCwlRunner(object): body={"priority": "0"}).execute(num_retries=self.num_retries) finally: self.cond.release() + self.stop_polling.set() + self.polling_thread.join() if self.final_status == "UnsupportedRequirement": raise UnsupportedRequirement("Check log for details.") -- 2.30.2