From 540ecd0ae604df1cf02a63515e6e9e8e04e6e64a Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 5 Apr 2018 17:10:25 -0400 Subject: [PATCH] 13108: Work queue submit wip Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/__init__.py | 43 ++++++++++++++++++++--------- sdk/cwl/arvados_cwl/arvcontainer.py | 2 ++ sdk/cwl/arvados_cwl/arvjob.py | 2 ++ 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index fbef5347e6..73901555ac 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -17,6 +17,7 @@ import json import re from functools import partial import pkg_resources # part of setuptools +import Queue from cwltool.errors import WorkflowException import cwltool.main @@ -82,6 +83,7 @@ class ArvCwlRunner(object): self.intermediate_output_ttl = 0 self.intermediate_output_collections = [] self.trash_intermediate = False + self.runnable_queue = Queue.Queue() if keep_client is not None: self.keep_client = keep_client @@ -123,23 +125,29 @@ class ArvCwlRunner(object): return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs) def output_callback(self, out, processStatus): - if processStatus == "success": - logger.info("Overall process status is %s", processStatus) - if self.pipeline: - self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], - body={"state": "Complete"}).execute(num_retries=self.num_retries) - else: - logger.warn("Overall process status is %s", processStatus) - if self.pipeline: - self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], - body={"state": "Failed"}).execute(num_retries=self.num_retries) - self.final_status = processStatus - self.final_output = out + with self.workflow_eval_lock: + if processStatus == "success": + logger.info("Overall process status is %s", processStatus) + if self.pipeline: + self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], + body={"state": "Complete"}).execute(num_retries=self.num_retries) + else: + logger.warn("Overall process status is %s", processStatus) + if self.pipeline: + self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], + body={"state": "Failed"}).execute(num_retries=self.num_retries) + self.final_status = processStatus + self.final_output = out + + def runnable_queue_thread(self): + while True: + runnable, kwargs = self.runnable_queue.get() + runnable.run(**kwargs) def start_run(self, runnable, kwargs): with self.workflow_eval_lock: self.in_flight += 1 - runnable.run(**kwargs) + self.runnable_queue.put((runnable, kwargs)) def process_submitted(self, container): with self.workflow_eval_lock: @@ -151,6 +159,13 @@ class ArvCwlRunner(object): if uuid in self.processes: del self.processes[uuid] + def wrapped_callback(self, cb, obj, st): + with self.workflow_eval_lock: + cb(obj, st) + + def get_wrapped_callback(self, cb): + return partial(self.wrapped_callback, cb) + def on_message(self, event): if "object_uuid" in event: if event["object_uuid"] in self.processes and event["event_type"] == "update": @@ -496,6 +511,8 @@ class ArvCwlRunner(object): self.polling_thread = threading.Thread(target=self.poll_states) self.polling_thread.start() + threading.Thread(target=self.runnable_queue_thread).start() + if runnerjob: jobiter = iter((runnerjob,)) else: diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index ab6dddb45c..eb333d05ae 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -248,6 +248,8 @@ class ArvadosContainer(object): container_request["name"] = wfrecord["name"] container_request["properties"]["template_uuid"] = wfuuid + self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback) + try: response = self.arvrunner.api.container_requests().create( body=container_request diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py index 0c35115f93..e2df831c46 100644 --- a/sdk/cwl/arvados_cwl/arvjob.py +++ b/sdk/cwl/arvados_cwl/arvjob.py @@ -134,6 +134,8 @@ class ArvadosJob(object): if reuse_req: enable_reuse = reuse_req["enableReuse"] + self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback) + try: with Perf(metrics, "create %s" % self.name): response = self.arvrunner.api.jobs().create( -- 2.30.2