13108: Refactor task queue into its own class.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 695597f839c1ceee5ccbb67f3391218d3d1a8e34..7affade0734536fb3e7ee241bed9357a995eb949 100644 (file)
@@ -17,13 +17,15 @@ import json
 import re
 from functools import partial
 import pkg_resources  # part of setuptools
 import re
 from functools import partial
 import pkg_resources  # part of setuptools
+import Queue
+import time
 
 from cwltool.errors import WorkflowException
 import cwltool.main
 import cwltool.workflow
 import cwltool.process
 
 from cwltool.errors import WorkflowException
 import cwltool.main
 import cwltool.workflow
 import cwltool.process
-import schema_salad
 from schema_salad.sourceline import SourceLine
 from schema_salad.sourceline import SourceLine
+import schema_salad.validate as validate
 
 import arvados
 import arvados.config
 
 import arvados
 import arvados.config
@@ -38,12 +40,13 @@ from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
+from .task_queue import TaskQueue
 from ._version import __version__
 
 from cwltool.pack import pack
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
 from ._version import __version__
 
 from cwltool.pack import pack
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
-from cwltool.draft2tool import compute_checksums
+from cwltool.command_line_tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -54,17 +57,19 @@ arvados.log_handler.setFormatter(logging.Formatter(
         '%(asctime)s %(name)s %(levelname)s: %(message)s',
         '%Y-%m-%d %H:%M:%S'))
 
         '%(asctime)s %(name)s %(levelname)s: %(message)s',
         '%Y-%m-%d %H:%M:%S'))
 
+DEFAULT_PRIORITY = 500
+
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
     containers API), wait for them to complete, and report output.
 
     """
 
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
     containers API), wait for them to complete, and report output.
 
     """
 
-    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
+    def __init__(self, api_client, work_api=None, keep_client=None,
+                 output_name=None, output_tags=None, num_retries=4):
         self.api = api_client
         self.processes = {}
         self.api = api_client
         self.processes = {}
-        self.lock = threading.Lock()
-        self.cond = threading.Condition(self.lock)
+        self.workflow_eval_lock = threading.Condition(threading.RLock())
         self.final_output = None
         self.final_status = None
         self.uploaded = {}
         self.final_output = None
         self.final_status = None
         self.uploaded = {}
@@ -80,6 +85,8 @@ class ArvCwlRunner(object):
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
+        self.thread_count = 4
+        self.poll_interval = 12
 
         if keep_client is not None:
             self.keep_client = keep_client
 
         if keep_client is not None:
             self.keep_client = keep_client
@@ -111,8 +118,8 @@ class ArvCwlRunner(object):
         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
                                                 api_client=self.api,
                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
                                                 api_client=self.api,
                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
-                                                num_retries=self.num_retries,
-                                                overrides=kwargs.get("override_tools"))
+                                                num_retries=self.num_retries)
+        kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
@@ -121,40 +128,58 @@ class ArvCwlRunner(object):
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
     def output_callback(self, out, processStatus):
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
     def output_callback(self, out, processStatus):
-        if processStatus == "success":
-            logger.info("Overall process status is %s", processStatus)
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Complete"}).execute(num_retries=self.num_retries)
-        else:
-            logger.warn("Overall process status is %s", processStatus)
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
-        self.final_status = processStatus
-        self.final_output = out
+        with self.workflow_eval_lock:
+            if processStatus == "success":
+                logger.info("Overall process status is %s", processStatus)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Complete"}).execute(num_retries=self.num_retries)
+            else:
+                logger.warn("Overall process status is %s", processStatus)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            self.final_status = processStatus
+            self.final_output = out
+            self.workflow_eval_lock.notifyAll()
+
+
+    def start_run(self, runnable, kwargs):
+        self.task_queue.add(partial(runnable.run, **kwargs))
+
+    def process_submitted(self, container):
+        with self.workflow_eval_lock:
+            self.processes[container.uuid] = container
+
+    def process_done(self, uuid):
+        with self.workflow_eval_lock:
+            if uuid in self.processes:
+                del self.processes[uuid]
+
+    def wrapped_callback(self, cb, obj, st):
+        with self.workflow_eval_lock:
+            cb(obj, st)
+            self.workflow_eval_lock.notifyAll()
+
+    def get_wrapped_callback(self, cb):
+        return partial(self.wrapped_callback, cb)
 
     def on_message(self, event):
         if "object_uuid" in event:
             if event["object_uuid"] in self.processes and event["event_type"] == "update":
 
     def on_message(self, event):
         if "object_uuid" in event:
             if event["object_uuid"] in self.processes and event["event_type"] == "update":
-                if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
-                    uuid = event["object_uuid"]
-                    with self.lock:
+                uuid = event["object_uuid"]
+                if event["properties"]["new_attributes"]["state"] == "Running":
+                    with self.workflow_eval_lock:
                         j = self.processes[uuid]
                         j = self.processes[uuid]
-                        logger.info("%s %s is Running", self.label(j), uuid)
-                        j.running = True
-                        j.update_pipeline_component(event["properties"]["new_attributes"])
+                        if j.running is False:
+                            j.running = True
+                            j.update_pipeline_component(event["properties"]["new_attributes"])
+                            logger.info("%s %s is Running", self.label(j), uuid)
                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
-                    uuid = event["object_uuid"]
-                    try:
-                        self.cond.acquire()
+                    with self.workflow_eval_lock:
                         j = self.processes[uuid]
                         j = self.processes[uuid]
-                        logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
-                        with Perf(metrics, "done %s" % j.name):
-                            j.done(event["properties"]["new_attributes"])
-                        self.cond.notify()
-                    finally:
-                        self.cond.release()
+                    self.task_queue.add(partial(j.done, event["properties"]["new_attributes"]))
+                    logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
 
     def label(self, obj):
         return "[%s %s]" % (self.work_api[0:-1], obj.name)
 
     def label(self, obj):
         return "[%s %s]" % (self.work_api[0:-1], obj.name)
@@ -166,15 +191,19 @@ class ArvCwlRunner(object):
         """
 
         try:
         """
 
         try:
+            remain_wait = self.poll_interval
             while True:
             while True:
-                self.stop_polling.wait(15)
+                if remain_wait > 0:
+                    self.stop_polling.wait(remain_wait)
                 if self.stop_polling.is_set():
                     break
                 if self.stop_polling.is_set():
                     break
-                with self.lock:
-                    keys = self.processes.keys()
+                with self.workflow_eval_lock:
+                    keys = list(self.processes.keys())
                 if not keys:
                 if not keys:
+                    remain_wait = self.poll_interval
                     continue
 
                     continue
 
+                begin_poll = time.time()
                 if self.work_api == "containers":
                     table = self.poll_api.container_requests()
                 elif self.work_api == "jobs":
                 if self.work_api == "containers":
                     table = self.poll_api.container_requests()
                 elif self.work_api == "jobs":
@@ -184,6 +213,7 @@ class ArvCwlRunner(object):
                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
                 except Exception as e:
                     logger.warn("Error checking states on API server: %s", e)
                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
                 except Exception as e:
                     logger.warn("Error checking states on API server: %s", e)
+                    remain_wait = self.poll_interval
                     continue
 
                 for p in proc_states["items"]:
                     continue
 
                 for p in proc_states["items"]:
@@ -194,12 +224,13 @@ class ArvCwlRunner(object):
                             "new_attributes": p
                         }
                     })
                             "new_attributes": p
                         }
                     })
+                finish_poll = time.time()
+                remain_wait = self.poll_interval - (finish_poll - begin_poll)
         except:
         except:
-            logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
-            self.cond.acquire()
-            self.processes.clear()
-            self.cond.notify()
-            self.cond.release()
+            logger.exception("Fatal error in state polling thread.")
+            with self.workflow_eval_lock:
+                self.processes.clear()
+                self.workflow_eval_lock.notifyAll()
         finally:
             self.stop_polling.set()
 
         finally:
             self.stop_polling.set()
 
@@ -225,18 +256,23 @@ class ArvCwlRunner(object):
 
     def check_features(self, obj):
         if isinstance(obj, dict):
 
     def check_features(self, obj):
         if isinstance(obj, dict):
-            if obj.get("writable"):
-                raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
+            if obj.get("writable") and self.work_api != "containers":
+                raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
             if obj.get("class") == "DockerRequirement":
                 if obj.get("dockerOutputDirectory"):
             if obj.get("class") == "DockerRequirement":
                 if obj.get("dockerOutputDirectory"):
-                    # TODO: can be supported by containers API, but not jobs API.
-                    raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
-                        "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
+                    if self.work_api != "containers":
+                        raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+                            "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
+                    if not obj.get("dockerOutputDirectory").startswith('/'):
+                        raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
+                            "Option 'dockerOutputDirectory' must be an absolute path.")
+            if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
+                raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
             for v in obj.itervalues():
                 self.check_features(v)
         elif isinstance(obj, list):
             for i,v in enumerate(obj):
             for v in obj.itervalues():
                 self.check_features(v)
         elif isinstance(obj, list):
             for i,v in enumerate(obj):
-                with SourceLine(obj, i, UnsupportedRequirement):
+                with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
                     self.check_features(v)
 
     def make_output_collection(self, name, tagsString, outputObj):
                     self.check_features(v)
 
     def make_output_collection(self, name, tagsString, outputObj):
@@ -280,7 +316,7 @@ class ArvCwlRunner(object):
 
         def rewrite(fileobj):
             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
 
         def rewrite(fileobj):
             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
-            for k in ("basename", "listing", "contents"):
+            for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
                 if k in fileobj:
                     del fileobj[k]
 
                 if k in fileobj:
                     del fileobj[k]
 
@@ -349,7 +385,8 @@ class ArvCwlRunner(object):
         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
                                                                  collection_cache=self.collection_cache)
         self.fs_access = make_fs_access(kwargs["basedir"])
         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
                                                                  collection_cache=self.collection_cache)
         self.fs_access = make_fs_access(kwargs["basedir"])
-
+        self.secret_store = kwargs.get("secret_store")
+        self.thread_count = kwargs.get("thread_count", 4)
 
         self.trash_intermediate = kwargs["trash_intermediate"]
         if self.trash_intermediate and self.work_api != "containers":
 
         self.trash_intermediate = kwargs["trash_intermediate"]
         if self.trash_intermediate and self.work_api != "containers":
@@ -366,17 +403,17 @@ class ArvCwlRunner(object):
 
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
 
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
-        override_tools = {}
-        upload_workflow_deps(self, tool, override_tools)
+        merged_map = upload_workflow_deps(self, tool)
 
         # Reload tool object which may have been updated by
         # upload_workflow_deps
 
         # Reload tool object which may have been updated by
         # upload_workflow_deps
+        # Don't validate this time because it will just print redundant errors.
         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
                                   makeTool=self.arv_make_tool,
                                   loader=tool.doc_loader,
                                   avsc_names=tool.doc_schema,
                                   metadata=tool.metadata,
         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
                                   makeTool=self.arv_make_tool,
                                   loader=tool.doc_loader,
                                   avsc_names=tool.doc_schema,
                                   metadata=tool.metadata,
-                                  override_tools=override_tools)
+                                  do_validate=False)
 
         # Upload local file references in the job order.
         job_order = upload_job_order(self, "%s input" % kwargs["name"],
 
         # Upload local file references in the job order.
         job_order = upload_job_order(self, "%s input" % kwargs["name"],
@@ -390,7 +427,8 @@ class ArvCwlRunner(object):
                                       kwargs.get("enable_reuse"),
                                       uuid=existing_uuid,
                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
                                       kwargs.get("enable_reuse"),
                                       uuid=existing_uuid,
                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                      name=kwargs["name"])
+                                      name=kwargs["name"],
+                                      merged_map=merged_map)
                 tmpl.save()
                 # cwltool.main will write our return value to stdout.
                 return (tmpl.uuid, "success")
                 tmpl.save()
                 # cwltool.main will write our return value to stdout.
                 return (tmpl.uuid, "success")
@@ -399,10 +437,12 @@ class ArvCwlRunner(object):
                                         self.project_uuid,
                                         uuid=existing_uuid,
                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
                                         self.project_uuid,
                                         uuid=existing_uuid,
                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                        name=kwargs["name"]),
+                                        name=kwargs["name"],
+                                        merged_map=merged_map),
                         "success")
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
                         "success")
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
+        self.eval_timeout = kwargs.get("eval_timeout")
 
         kwargs["make_fs_access"] = make_fs_access
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
 
         kwargs["make_fs_access"] = make_fs_access
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
@@ -411,15 +451,22 @@ class ArvCwlRunner(object):
         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
 
         if self.work_api == "containers":
         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
 
         if self.work_api == "containers":
+            if self.ignore_docker_for_reuse:
+                raise Exception("--ignore-docker-for-reuse not supported with containers API.")
             kwargs["outdir"] = "/var/spool/cwl"
             kwargs["docker_outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
             kwargs["docker_tmpdir"] = "/tmp"
         elif self.work_api == "jobs":
             kwargs["outdir"] = "/var/spool/cwl"
             kwargs["docker_outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
             kwargs["docker_tmpdir"] = "/tmp"
         elif self.work_api == "jobs":
+            if kwargs["priority"] != DEFAULT_PRIORITY:
+                raise Exception("--priority not implemented for jobs API.")
             kwargs["outdir"] = "$(task.outdir)"
             kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
             kwargs["outdir"] = "$(task.outdir)"
             kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
+        if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
+            raise Exception("--priority must be in the range 1..1000.")
+
         runnerjob = None
         if kwargs.get("submit"):
             # Submit a runner job to run the workflow for us.
         runnerjob = None
         if kwargs.get("submit"):
             # Submit a runner job to run the workflow for us.
@@ -437,7 +484,10 @@ class ArvCwlRunner(object):
                                                 name=kwargs.get("name"),
                                                 on_error=kwargs.get("on_error"),
                                                 submit_runner_image=kwargs.get("submit_runner_image"),
                                                 name=kwargs.get("name"),
                                                 on_error=kwargs.get("on_error"),
                                                 submit_runner_image=kwargs.get("submit_runner_image"),
-                                                intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
+                                                intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
+                                                merged_map=merged_map,
+                                                priority=kwargs.get("priority"),
+                                                secret_store=self.secret_store)
             elif self.work_api == "jobs":
                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
                                       self.output_name,
             elif self.work_api == "jobs":
                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
                                       self.output_name,
@@ -445,7 +495,8 @@ class ArvCwlRunner(object):
                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
                                       name=kwargs.get("name"),
                                       on_error=kwargs.get("on_error"),
                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
                                       name=kwargs.get("name"),
                                       on_error=kwargs.get("on_error"),
-                                      submit_runner_image=kwargs.get("submit_runner_image"))
+                                      submit_runner_image=kwargs.get("submit_runner_image"),
+                                      merged_map=merged_map)
         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
@@ -457,13 +508,15 @@ class ArvCwlRunner(object):
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
         if runnerjob and not kwargs.get("wait"):
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
         if runnerjob and not kwargs.get("wait"):
-            runnerjob.run(wait=kwargs.get("wait"))
+            runnerjob.run(**kwargs)
             return (runnerjob.uuid, "success")
 
         self.poll_api = arvados.api('v1')
         self.polling_thread = threading.Thread(target=self.poll_states)
         self.polling_thread.start()
 
             return (runnerjob.uuid, "success")
 
         self.poll_api = arvados.api('v1')
         self.polling_thread = threading.Thread(target=self.poll_states)
         self.polling_thread.start()
 
+        self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
+
         if runnerjob:
             jobiter = iter((runnerjob,))
         else:
         if runnerjob:
             jobiter = iter((runnerjob,))
         else:
@@ -474,10 +527,11 @@ class ArvCwlRunner(object):
                                **kwargs)
 
         try:
                                **kwargs)
 
         try:
-            self.cond.acquire()
-            # Will continue to hold the lock for the duration of this code
-            # except when in cond.wait(), at which point on_message can update
-            # job state and process output callbacks.
+            self.workflow_eval_lock.acquire()
+            # Holds the lock while this code runs and releases it when
+            # it is safe to do so in self.workflow_eval_lock.wait(),
+            # at which point on_message can update job state and
+            # process output callbacks.
 
             loopperf = Perf(metrics, "jobiter")
             loopperf.__enter__()
 
             loopperf = Perf(metrics, "jobiter")
             loopperf.__enter__()
@@ -487,20 +541,25 @@ class ArvCwlRunner(object):
                 if self.stop_polling.is_set():
                     break
 
                 if self.stop_polling.is_set():
                     break
 
+                if self.task_queue.error is not None:
+                    raise self.task_queue.error
+
                 if runnable:
                     with Perf(metrics, "run"):
                 if runnable:
                     with Perf(metrics, "run"):
-                        runnable.run(**kwargs)
+                        self.start_run(runnable, kwargs)
                 else:
                 else:
-                    if self.processes:
-                        self.cond.wait(1)
+                    if (self.task_queue.in_flight + len(self.processes)) > 0:
+                        self.workflow_eval_lock.wait(3)
                     else:
                     else:
-                        logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+                        logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pendingjobs.")
                         break
                 loopperf.__enter__()
             loopperf.__exit__()
 
                         break
                 loopperf.__enter__()
             loopperf.__exit__()
 
-            while self.processes:
-                self.cond.wait(1)
+            while (self.task_queue.in_flight + len(self.processes)) > 0:
+                if self.task_queue.error is not None:
+                    raise self.task_queue.error
+                self.workflow_eval_lock.wait(3)
 
         except UnsupportedRequirement:
             raise
 
         except UnsupportedRequirement:
             raise
@@ -516,9 +575,11 @@ class ArvCwlRunner(object):
                 self.api.container_requests().update(uuid=runnerjob.uuid,
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
                 self.api.container_requests().update(uuid=runnerjob.uuid,
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
-            self.cond.release()
+            self.workflow_eval_lock.release()
+            self.task_queue.drain()
             self.stop_polling.set()
             self.polling_thread.join()
             self.stop_polling.set()
             self.polling_thread.join()
+            self.task_queue.join()
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
@@ -553,7 +614,7 @@ def versionstring():
     arvpkg = pkg_resources.require("arvados-python-client")
     cwlpkg = pkg_resources.require("cwltool")
 
     arvpkg = pkg_resources.require("arvados-python-client")
     cwlpkg = pkg_resources.require("cwltool")
 
-    return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
+    return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
                                     "arvados-python-client", arvpkg[0].version,
                                     "cwltool", cwlpkg[0].version)
 
                                     "arvados-python-client", arvpkg[0].version,
                                     "cwltool", cwlpkg[0].version)
 
@@ -574,7 +635,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--print-dot", action="store_true",
                          help="Print workflow visualization in graphviz format and exit")
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--print-dot", action="store_true",
                          help="Print workflow visualization in graphviz format and exit")
-    exgroup.add_argument("--version", action="store_true", help="Print version and exit")
+    exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
 
     exgroup = parser.add_mutually_exclusive_group()
@@ -656,6 +717,21 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
                         default=0)
 
                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
                         default=0)
 
+    parser.add_argument("--priority", type=int,
+                        help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
+                        default=DEFAULT_PRIORITY)
+
+    parser.add_argument("--disable-validate", dest="do_validate",
+                        action="store_false", default=True,
+                        help=argparse.SUPPRESS)
+
+    parser.add_argument("--disable-js-validation",
+                        action="store_true", default=False,
+                        help=argparse.SUPPRESS)
+
+    parser.add_argument("--thread-count", type=int,
+                        default=4, help="Number of threads to use for job submit and output collection.")
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--trash-intermediate", action="store_true",
                         default=False, dest="trash_intermediate",
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--trash-intermediate", action="store_true",
                         default=False, dest="trash_intermediate",
@@ -664,14 +740,14 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=False, dest="trash_intermediate",
                         help="Do not trash intermediate outputs (default).")
 
                         default=False, dest="trash_intermediate",
                         help="Do not trash intermediate outputs (default).")
 
-    parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
+    parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
     return parser
 
 def add_arv_hints():
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
     return parser
 
 def add_arv_hints():
-    cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
-    cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
+    cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
+    cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
     res.close()
     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
     res.close()
@@ -692,10 +768,6 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     job_order_object = None
     arvargs = parser.parse_args(args)
 
     job_order_object = None
     arvargs = parser.parse_args(args)
 
-    if arvargs.version:
-        print versionstring()
-        return
-
     if arvargs.update_workflow:
         if arvargs.update_workflow.find('-7fd4e-') == 5:
             want_api = 'containers'
     if arvargs.update_workflow:
         if arvargs.update_workflow.find('-7fd4e-') == 5:
             want_api = 'containers'
@@ -716,7 +788,8 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     try:
         if api_client is None:
 
     try:
         if api_client is None:
-            api_client=arvados.api('v1', model=OrderedJsonModel())
+            api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
+            keep_client = api_client.keep
         if keep_client is None:
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
         if keep_client is None:
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
@@ -749,7 +822,6 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     arvargs.conformance_test = None
     arvargs.use_container = True
     arvargs.relax_path_checks = True
     arvargs.conformance_test = None
     arvargs.use_container = True
     arvargs.relax_path_checks = True
-    arvargs.validate = None
     arvargs.print_supported_versions = False
 
     make_fs_access = partial(CollectionFsAccess,
     arvargs.print_supported_versions = False
 
     make_fs_access = partial(CollectionFsAccess,