Merge branch 'master' into 7478-anm-spot-instances
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index a0b71723c13163011f9d153454de0f9fb0528bee..d509f400f1058396f2fc91e6ef320a2bbebe92e1 100644 (file)
@@ -17,6 +17,10 @@ import json
 import re
 from functools import partial
 import pkg_resources  # part of setuptools
 import re
 from functools import partial
 import pkg_resources  # part of setuptools
+import Queue
+import time
+import signal
+import thread
 
 from cwltool.errors import WorkflowException
 import cwltool.main
 
 from cwltool.errors import WorkflowException
 import cwltool.main
@@ -29,6 +33,7 @@ import arvados
 import arvados.config
 from arvados.keep import KeepClient
 from arvados.errors import ApiError
 import arvados.config
 from arvados.keep import KeepClient
 from arvados.errors import ApiError
+import arvados.commands._util as arv_cmd
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
@@ -38,6 +43,7 @@ from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
+from .task_queue import TaskQueue
 from ._version import __version__
 
 from cwltool.pack import pack
 from ._version import __version__
 
 from cwltool.pack import pack
@@ -62,11 +68,12 @@ class ArvCwlRunner(object):
 
     """
 
 
     """
 
-    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
+    def __init__(self, api_client, work_api=None, keep_client=None,
+                 output_name=None, output_tags=None, num_retries=4,
+                 thread_count=4):
         self.api = api_client
         self.processes = {}
         self.api = api_client
         self.processes = {}
-        self.lock = threading.Lock()
-        self.cond = threading.Condition(self.lock)
+        self.workflow_eval_lock = threading.Condition(threading.RLock())
         self.final_output = None
         self.final_status = None
         self.uploaded = {}
         self.final_output = None
         self.final_status = None
         self.uploaded = {}
@@ -82,6 +89,8 @@ class ArvCwlRunner(object):
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
+        self.thread_count = thread_count
+        self.poll_interval = 12
 
         if keep_client is not None:
             self.keep_client = keep_client
 
         if keep_client is not None:
             self.keep_client = keep_client
@@ -90,6 +99,11 @@ class ArvCwlRunner(object):
 
         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
 
 
         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
 
+        self.fetcher_constructor = partial(CollectionFetcher,
+                                           api_client=self.api,
+                                           fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
+                                           num_retries=self.num_retries)
+
         self.work_api = None
         expected_api = ["jobs", "containers"]
         for api in expected_api:
         self.work_api = None
         expected_api = ["jobs", "containers"]
         for api in expected_api:
@@ -110,10 +124,7 @@ class ArvCwlRunner(object):
 
     def arv_make_tool(self, toolpath_object, **kwargs):
         kwargs["work_api"] = self.work_api
 
     def arv_make_tool(self, toolpath_object, **kwargs):
         kwargs["work_api"] = self.work_api
-        kwargs["fetcher_constructor"] = partial(CollectionFetcher,
-                                                api_client=self.api,
-                                                fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
-                                                num_retries=self.num_retries)
+        kwargs["fetcher_constructor"] = self.fetcher_constructor
         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
@@ -123,40 +134,56 @@ class ArvCwlRunner(object):
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
     def output_callback(self, out, processStatus):
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
     def output_callback(self, out, processStatus):
-        if processStatus == "success":
-            logger.info("Overall process status is %s", processStatus)
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Complete"}).execute(num_retries=self.num_retries)
-        else:
-            logger.warn("Overall process status is %s", processStatus)
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
-        self.final_status = processStatus
-        self.final_output = out
+        with self.workflow_eval_lock:
+            if processStatus == "success":
+                logger.info("Overall process status is %s", processStatus)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Complete"}).execute(num_retries=self.num_retries)
+            else:
+                logger.warn("Overall process status is %s", processStatus)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            self.final_status = processStatus
+            self.final_output = out
+            self.workflow_eval_lock.notifyAll()
+
+
+    def start_run(self, runnable, kwargs):
+        self.task_queue.add(partial(runnable.run, **kwargs))
+
+    def process_submitted(self, container):
+        with self.workflow_eval_lock:
+            self.processes[container.uuid] = container
+
+    def process_done(self, uuid, record):
+        with self.workflow_eval_lock:
+            j = self.processes[uuid]
+            logger.info("%s %s is %s", self.label(j), uuid, record["state"])
+            self.task_queue.add(partial(j.done, record))
+            del self.processes[uuid]
+
+    def wrapped_callback(self, cb, obj, st):
+        with self.workflow_eval_lock:
+            cb(obj, st)
+            self.workflow_eval_lock.notifyAll()
+
+    def get_wrapped_callback(self, cb):
+        return partial(self.wrapped_callback, cb)
 
     def on_message(self, event):
 
     def on_message(self, event):
-        if "object_uuid" in event:
-            if event["object_uuid"] in self.processes and event["event_type"] == "update":
-                if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
-                    uuid = event["object_uuid"]
-                    with self.lock:
-                        j = self.processes[uuid]
-                        logger.info("%s %s is Running", self.label(j), uuid)
+        if event.get("object_uuid") in self.processes and event["event_type"] == "update":
+            uuid = event["object_uuid"]
+            if event["properties"]["new_attributes"]["state"] == "Running":
+                with self.workflow_eval_lock:
+                    j = self.processes[uuid]
+                    if j.running is False:
                         j.running = True
                         j.update_pipeline_component(event["properties"]["new_attributes"])
                         j.running = True
                         j.update_pipeline_component(event["properties"]["new_attributes"])
-                elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
-                    uuid = event["object_uuid"]
-                    try:
-                        self.cond.acquire()
-                        j = self.processes[uuid]
-                        logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
-                        with Perf(metrics, "done %s" % j.name):
-                            j.done(event["properties"]["new_attributes"])
-                        self.cond.notify()
-                    finally:
-                        self.cond.release()
+                        logger.info("%s %s is Running", self.label(j), uuid)
+            elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
+                self.process_done(uuid, event["properties"]["new_attributes"])
 
     def label(self, obj):
         return "[%s %s]" % (self.work_api[0:-1], obj.name)
 
     def label(self, obj):
         return "[%s %s]" % (self.work_api[0:-1], obj.name)
@@ -168,15 +195,19 @@ class ArvCwlRunner(object):
         """
 
         try:
         """
 
         try:
+            remain_wait = self.poll_interval
             while True:
             while True:
-                self.stop_polling.wait(15)
+                if remain_wait > 0:
+                    self.stop_polling.wait(remain_wait)
                 if self.stop_polling.is_set():
                     break
                 if self.stop_polling.is_set():
                     break
-                with self.lock:
-                    keys = self.processes.keys()
+                with self.workflow_eval_lock:
+                    keys = list(self.processes.keys())
                 if not keys:
                 if not keys:
+                    remain_wait = self.poll_interval
                     continue
 
                     continue
 
+                begin_poll = time.time()
                 if self.work_api == "containers":
                     table = self.poll_api.container_requests()
                 elif self.work_api == "jobs":
                 if self.work_api == "containers":
                     table = self.poll_api.container_requests()
                 elif self.work_api == "jobs":
@@ -186,6 +217,7 @@ class ArvCwlRunner(object):
                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
                 except Exception as e:
                     logger.warn("Error checking states on API server: %s", e)
                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
                 except Exception as e:
                     logger.warn("Error checking states on API server: %s", e)
+                    remain_wait = self.poll_interval
                     continue
 
                 for p in proc_states["items"]:
                     continue
 
                 for p in proc_states["items"]:
@@ -196,12 +228,13 @@ class ArvCwlRunner(object):
                             "new_attributes": p
                         }
                     })
                             "new_attributes": p
                         }
                     })
+                finish_poll = time.time()
+                remain_wait = self.poll_interval - (finish_poll - begin_poll)
         except:
         except:
-            logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
-            self.cond.acquire()
-            self.processes.clear()
-            self.cond.notify()
-            self.cond.release()
+            logger.exception("Fatal error in state polling thread.")
+            with self.workflow_eval_lock:
+                self.processes.clear()
+                self.workflow_eval_lock.notifyAll()
         finally:
             self.stop_polling.set()
 
         finally:
             self.stop_polling.set()
 
@@ -222,7 +255,7 @@ class ArvCwlRunner(object):
                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
             except:
                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
             except:
                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
-            if sys.exc_info()[0] is KeyboardInterrupt:
+            if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
                 break
 
     def check_features(self, obj):
                 break
 
     def check_features(self, obj):
@@ -478,13 +511,17 @@ class ArvCwlRunner(object):
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
         if runnerjob and not kwargs.get("wait"):
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
         if runnerjob and not kwargs.get("wait"):
-            runnerjob.run(wait=kwargs.get("wait"))
+            submitargs = kwargs.copy()
+            submitargs['submit'] = False
+            runnerjob.run(**submitargs)
             return (runnerjob.uuid, "success")
 
         self.poll_api = arvados.api('v1')
         self.polling_thread = threading.Thread(target=self.poll_states)
         self.polling_thread.start()
 
             return (runnerjob.uuid, "success")
 
         self.poll_api = arvados.api('v1')
         self.polling_thread = threading.Thread(target=self.poll_states)
         self.polling_thread.start()
 
+        self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
+
         if runnerjob:
             jobiter = iter((runnerjob,))
         else:
         if runnerjob:
             jobiter = iter((runnerjob,))
         else:
@@ -495,10 +532,11 @@ class ArvCwlRunner(object):
                                **kwargs)
 
         try:
                                **kwargs)
 
         try:
-            self.cond.acquire()
-            # Will continue to hold the lock for the duration of this code
-            # except when in cond.wait(), at which point on_message can update
-            # job state and process output callbacks.
+            self.workflow_eval_lock.acquire()
+            # Holds the lock while this code runs and releases it when
+            # it is safe to do so in self.workflow_eval_lock.wait(),
+            # at which point on_message can update job state and
+            # process output callbacks.
 
             loopperf = Perf(metrics, "jobiter")
             loopperf.__enter__()
 
             loopperf = Perf(metrics, "jobiter")
             loopperf.__enter__()
@@ -508,26 +546,31 @@ class ArvCwlRunner(object):
                 if self.stop_polling.is_set():
                     break
 
                 if self.stop_polling.is_set():
                     break
 
+                if self.task_queue.error is not None:
+                    raise self.task_queue.error
+
                 if runnable:
                     with Perf(metrics, "run"):
                 if runnable:
                     with Perf(metrics, "run"):
-                        runnable.run(**kwargs)
+                        self.start_run(runnable, kwargs)
                 else:
                 else:
-                    if self.processes:
-                        self.cond.wait(1)
+                    if (self.task_queue.in_flight + len(self.processes)) > 0:
+                        self.workflow_eval_lock.wait(3)
                     else:
                     else:
-                        logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+                        logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
                         break
                 loopperf.__enter__()
             loopperf.__exit__()
 
                         break
                 loopperf.__enter__()
             loopperf.__exit__()
 
-            while self.processes:
-                self.cond.wait(1)
+            while (self.task_queue.in_flight + len(self.processes)) > 0:
+                if self.task_queue.error is not None:
+                    raise self.task_queue.error
+                self.workflow_eval_lock.wait(3)
 
         except UnsupportedRequirement:
             raise
         except:
 
         except UnsupportedRequirement:
             raise
         except:
-            if sys.exc_info()[0] is KeyboardInterrupt:
-                logger.error("Interrupted, marking pipeline as failed")
+            if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+                logger.error("Interrupted, workflow will be cancelled")
             else:
                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             if self.pipeline:
             else:
                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             if self.pipeline:
@@ -537,9 +580,11 @@ class ArvCwlRunner(object):
                 self.api.container_requests().update(uuid=runnerjob.uuid,
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
                 self.api.container_requests().update(uuid=runnerjob.uuid,
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
-            self.cond.release()
+            self.workflow_eval_lock.release()
+            self.task_queue.drain()
             self.stop_polling.set()
             self.polling_thread.join()
             self.stop_polling.set()
             self.polling_thread.join()
+            self.task_queue.join()
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
@@ -689,6 +734,9 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         action="store_true", default=False,
                         help=argparse.SUPPRESS)
 
                         action="store_true", default=False,
                         help=argparse.SUPPRESS)
 
+    parser.add_argument("--thread-count", type=int,
+                        default=4, help="Number of threads to use for job submit and output collection.")
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--trash-intermediate", action="store_true",
                         default=False, dest="trash_intermediate",
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--trash-intermediate", action="store_true",
                         default=False, dest="trash_intermediate",
@@ -719,12 +767,20 @@ def add_arv_hints():
         "http://arvados.org/cwl#ReuseRequirement"
     ])
 
         "http://arvados.org/cwl#ReuseRequirement"
     ])
 
-def main(args, stdout, stderr, api_client=None, keep_client=None):
+def exit_signal_handler(sigcode, frame):
+    logger.error("Caught signal {}, exiting.".format(sigcode))
+    sys.exit(-sigcode)
+
+def main(args, stdout, stderr, api_client=None, keep_client=None,
+         install_sig_handlers=True):
     parser = arg_parser()
 
     job_order_object = None
     arvargs = parser.parse_args(args)
 
     parser = arg_parser()
 
     job_order_object = None
     arvargs = parser.parse_args(args)
 
+    if install_sig_handlers:
+        arv_cmd.install_signal_handlers()
+
     if arvargs.update_workflow:
         if arvargs.update_workflow.find('-7fd4e-') == 5:
             want_api = 'containers'
     if arvargs.update_workflow:
         if arvargs.update_workflow.find('-7fd4e-') == 5:
             want_api = 'containers'
@@ -745,12 +801,14 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     try:
         if api_client is None:
 
     try:
         if api_client is None:
-            api_client=arvados.api('v1', model=OrderedJsonModel())
+            api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
+            keep_client = api_client.keep
         if keep_client is None:
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
                               num_retries=4, output_name=arvargs.output_name,
         if keep_client is None:
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
                               num_retries=4, output_name=arvargs.output_name,
-                              output_tags=arvargs.output_tags)
+                              output_tags=arvargs.output_tags,
+                              thread_count=arvargs.thread_count)
     except Exception as e:
         logger.error(e)
         return 1
     except Exception as e:
         logger.error(e)
         return 1