def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
self.api = api_client
self.processes = {}
- self.lock = threading.Lock()
- self.cond = threading.Condition(self.lock)
+ self.in_flight = 0
+ self.workflow_eval_lock = threading.Condition(threading.RLock())
self.final_output = None
self.final_status = None
self.uploaded = {}
self.final_status = processStatus
self.final_output = out
+ def start_run(self, runnable, kwargs):
+ with self.workflow_eval_lock:
+ self.in_flight += 1
+ runnable.run(**kwargs)
+
+ def process_submitted(self, container):
+ with self.workflow_eval_lock:
+ self.processes[container.uuid] = container
+ self.in_flight -= 1
+
+ def process_done(self, uuid):
+ with self.workflow_eval_lock:
+ if uuid in self.processes:
+ del self.processes[uuid]
+
def on_message(self, event):
if "object_uuid" in event:
if event["object_uuid"] in self.processes and event["event_type"] == "update":
if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
uuid = event["object_uuid"]
- with self.lock:
+ with self.workflow_eval_lock:
j = self.processes[uuid]
logger.info("%s %s is Running", self.label(j), uuid)
j.running = True
j.update_pipeline_component(event["properties"]["new_attributes"])
elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
uuid = event["object_uuid"]
- try:
- self.cond.acquire()
+ with self.workflow_eval_lock:
j = self.processes[uuid]
logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
with Perf(metrics, "done %s" % j.name):
j.done(event["properties"]["new_attributes"])
- self.cond.notify()
- finally:
- self.cond.release()
+ self.workflow_eval_lock.notify()
def label(self, obj):
return "[%s %s]" % (self.work_api[0:-1], obj.name)
self.stop_polling.wait(15)
if self.stop_polling.is_set():
break
- with self.lock:
- keys = self.processes.keys()
+ with self.workflow_eval_lock:
+ keys = list(self.processes.keys())
if not keys:
continue
})
except:
logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
- self.cond.acquire()
- self.processes.clear()
- self.cond.notify()
- self.cond.release()
+ with workflow_eval_lock:
+ self.processes.clear()
+ self.workflow_eval_lock.notify()
finally:
self.stop_polling.set()
**kwargs)
try:
- self.cond.acquire()
- # Will continue to hold the lock for the duration of this code
- # except when in cond.wait(), at which point on_message can update
- # job state and process output callbacks.
+ self.workflow_eval_lock.acquire()
+ # Holds the lock while this code runs and releases it when
+ # it is safe to do so in self.workflow_eval_lock.wait(),
+ # at which point on_message can update job state and
+ # process output callbacks.
loopperf = Perf(metrics, "jobiter")
loopperf.__enter__()
if runnable:
with Perf(metrics, "run"):
- runnable.run(**kwargs)
+ self.start_run(runnable, kwargs)
else:
- if self.processes:
- self.cond.wait(1)
+ if (self.in_flight + len(self.processes)) > 0:
+ self.workflow_eval_lock.wait(1)
else:
logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
break
loopperf.__exit__()
while self.processes:
- self.cond.wait(1)
+ self.workflow_eval_lock.wait(1)
except UnsupportedRequirement:
raise
self.api.container_requests().update(uuid=runnerjob.uuid,
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
- self.cond.release()
+ self.workflow_eval_lock.release()
self.stop_polling.set()
self.polling_thread.join()
).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
- self.arvrunner.processes[self.uuid] = self
+ self.arvrunner.process_submitted(self)
if response["state"] == "Final":
logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
processStatus = "permanentFail"
finally:
self.output_callback(outputs, processStatus)
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
+ self.arvrunner.process_done(record["uuid"])
class RunnerContainer(Runner):
).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
- self.arvrunner.processes[self.uuid] = self
+ self.arvrunner.process_submitted(self)
logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"])
else:
super(RunnerContainer, self).done(container)
finally:
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
+ self.arvrunner.process_done(record["uuid"])
find_or_create=enable_reuse
).execute(num_retries=self.arvrunner.num_retries)
- self.arvrunner.processes[response["uuid"]] = self
+ self.uuid = response["uuid"]
+ self.arvrunner.process_submitted(self)
self.update_pipeline_component(response)
processStatus = "permanentFail"
finally:
self.output_callback(outputs, processStatus)
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
+ self.arvrunner.process_done(record["uuid"])
+
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
return
self.uuid = job["uuid"]
- self.arvrunner.processes[self.uuid] = self
+ self.arvrunner.process_submitted(self)
if job["state"] in ("Complete", "Failed", "Cancelled"):
self.done(job)