13108: Work queue submit wip
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 5 Apr 2018 21:10:25 +0000 (17:10 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 5 Apr 2018 21:10:25 +0000 (17:10 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py

index fbef5347e60cc584f533ef2effc1f40a82ec91da..73901555acc4a3cc115fe9187abca80c34cbc75d 100644 (file)
@@ -17,6 +17,7 @@ import json
 import re
 from functools import partial
 import pkg_resources  # part of setuptools
+import Queue
 
 from cwltool.errors import WorkflowException
 import cwltool.main
@@ -82,6 +83,7 @@ class ArvCwlRunner(object):
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
+        self.runnable_queue = Queue.Queue()
 
         if keep_client is not None:
             self.keep_client = keep_client
@@ -123,23 +125,29 @@ class ArvCwlRunner(object):
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
     def output_callback(self, out, processStatus):
-        if processStatus == "success":
-            logger.info("Overall process status is %s", processStatus)
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Complete"}).execute(num_retries=self.num_retries)
-        else:
-            logger.warn("Overall process status is %s", processStatus)
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
-        self.final_status = processStatus
-        self.final_output = out
+        with self.workflow_eval_lock:
+            if processStatus == "success":
+                logger.info("Overall process status is %s", processStatus)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Complete"}).execute(num_retries=self.num_retries)
+            else:
+                logger.warn("Overall process status is %s", processStatus)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            self.final_status = processStatus
+            self.final_output = out
+
+    def runnable_queue_thread(self):
+        while True:
+            runnable, kwargs = self.runnable_queue.get()
+            runnable.run(**kwargs)
 
     def start_run(self, runnable, kwargs):
         with self.workflow_eval_lock:
             self.in_flight += 1
-        runnable.run(**kwargs)
+        self.runnable_queue.put((runnable, kwargs))
 
     def process_submitted(self, container):
         with self.workflow_eval_lock:
@@ -151,6 +159,13 @@ class ArvCwlRunner(object):
             if uuid in self.processes:
                 del self.processes[uuid]
 
+    def wrapped_callback(self, cb, obj, st):
+        with self.workflow_eval_lock:
+            cb(obj, st)
+
+    def get_wrapped_callback(self, cb):
+        return partial(self.wrapped_callback, cb)
+
     def on_message(self, event):
         if "object_uuid" in event:
             if event["object_uuid"] in self.processes and event["event_type"] == "update":
@@ -496,6 +511,8 @@ class ArvCwlRunner(object):
         self.polling_thread = threading.Thread(target=self.poll_states)
         self.polling_thread.start()
 
+        threading.Thread(target=self.runnable_queue_thread).start()
+
         if runnerjob:
             jobiter = iter((runnerjob,))
         else:
index ab6dddb45c078ee835c528c9ecc96b4de8fd264b..eb333d05ae17aee122f9e490644b283abf795454 100644 (file)
@@ -248,6 +248,8 @@ class ArvadosContainer(object):
                 container_request["name"] = wfrecord["name"]
             container_request["properties"]["template_uuid"] = wfuuid
 
+        self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+
         try:
             response = self.arvrunner.api.container_requests().create(
                 body=container_request
index 0c35115f9365bd27b03da9e0af7cf0c683e95660..e2df831c4626b9b81f75c7ae64de945d54322d5d 100644 (file)
@@ -134,6 +134,8 @@ class ArvadosJob(object):
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
 
+        self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+
         try:
             with Perf(metrics, "create %s" % self.name):
                 response = self.arvrunner.api.jobs().create(