import cwltool.workflow
import arvados
-import arvados.events
import arvados.config
from .arvcontainer import ArvadosContainer, RunnerContainer
self.num_retries = 4
self.uuid = None
self.work_api = work_api
+ self.stop_polling = threading.Event()
+ self.poll_api = None
if self.work_api is None:
# todo: autodetect API to use.
finally:
self.cond.release()
+ def poll_states(self):
+ """Poll status of jobs or containers listed in the processes dict.
+
+ Runs in a separate thread.
+ """
+
+ while True:
+ self.stop_polling.wait(15)
+ if self.stop_polling.is_set():
+ break
+ with self.lock:
+ keys = self.processes.keys()
+ if not keys:
+ continue
+
+ if self.work_api == "containers":
+ table = self.poll_api.containers()
+ elif self.work_api == "jobs":
+ table = self.poll_api.jobs()
+
+ try:
+ proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.warn("Error checking states on API server: %s", e)
+ continue
+
+ for p in proc_states["items"]:
+ self.on_message({
+ "object_uuid": p["uuid"],
+ "event_type": "update",
+ "properties": {
+ "new_attributes": p
+ }
+ })
+
def get_uploaded(self):
return self.uploaded.copy()
runnerjob.run()
return runnerjob.uuid
- arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
-
- if self.work_api == "containers":
- events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
- if self.work_api == "jobs":
- events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+ self.poll_api = arvados.api('v1')
+ self.polling_thread = threading.Thread(target=self.poll_states)
+ self.polling_thread.start()
if runnerjob:
jobiter = iter((runnerjob,))
while self.processes:
self.cond.wait(1)
- events.close()
except UnsupportedRequirement:
raise
except:
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
self.cond.release()
+ self.stop_polling.set()
+ self.polling_thread.join()
if self.final_status == "UnsupportedRequirement":
raise UnsupportedRequirement("Check log for details.")