Merge branch 'master' into 13822-nm-delayed-daemon
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 4701b4d8f13a29a2c1dc8f3bc5558de788a5a1fa..131795ee2c0173703a7385c8676d11536ff17398 100644 (file)
@@ -17,32 +17,42 @@ import json
 import re
 from functools import partial
 import pkg_resources  # part of setuptools
+import Queue
+import time
+import signal
+import thread
 
 from cwltool.errors import WorkflowException
 import cwltool.main
 import cwltool.workflow
 import cwltool.process
 from schema_salad.sourceline import SourceLine
+import schema_salad.validate as validate
+import cwltool.argparser
 
 import arvados
 import arvados.config
 from arvados.keep import KeepClient
 from arvados.errors import ApiError
+import arvados.commands._util as arv_cmd
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
+from .task_queue import TaskQueue
+from .context import ArvLoadingContext, ArvRuntimeContext
 from ._version import __version__
 
 from cwltool.pack import pack
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
-from cwltool.draft2tool import compute_checksums
+from cwltool.command_line_tool import compute_checksums
+
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -53,32 +63,47 @@ arvados.log_handler.setFormatter(logging.Formatter(
         '%(asctime)s %(name)s %(levelname)s: %(message)s',
         '%Y-%m-%d %H:%M:%S'))
 
+DEFAULT_PRIORITY = 500
+
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
     containers API), wait for them to complete, and report output.
 
     """
 
-    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
+    def __init__(self, api_client,
+                 arvargs=None,
+                 keep_client=None,
+                 num_retries=4,
+                 thread_count=4):
+
+        if arvargs is None:
+            arvargs = argparse.Namespace()
+            arvargs.work_api = None
+            arvargs.output_name = None
+            arvargs.output_tags = None
+            arvargs.thread_count = 1
+
         self.api = api_client
         self.processes = {}
-        self.lock = threading.Lock()
-        self.cond = threading.Condition(self.lock)
+        self.workflow_eval_lock = threading.Condition(threading.RLock())
         self.final_output = None
         self.final_status = None
-        self.uploaded = {}
         self.num_retries = num_retries
         self.uuid = None
         self.stop_polling = threading.Event()
         self.poll_api = None
         self.pipeline = None
         self.final_output_collection = None
-        self.output_name = output_name
-        self.output_tags = output_tags
+        self.output_name = arvargs.output_name
+        self.output_tags = arvargs.output_tags
         self.project_uuid = None
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
+        self.thread_count = arvargs.thread_count
+        self.poll_interval = 12
+        self.loadingContext = None
 
         if keep_client is not None:
             self.keep_client = keep_client
@@ -87,73 +112,94 @@ class ArvCwlRunner(object):
 
         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
 
+        self.fetcher_constructor = partial(CollectionFetcher,
+                                           api_client=self.api,
+                                           fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
+                                           num_retries=self.num_retries)
+
         self.work_api = None
         expected_api = ["jobs", "containers"]
         for api in expected_api:
             try:
                 methods = self.api._rootDesc.get('resources')[api]['methods']
                 if ('httpMethod' in methods['create'] and
-                    (work_api == api or work_api is None)):
+                    (arvargs.work_api == api or arvargs.work_api is None)):
                     self.work_api = api
                     break
             except KeyError:
                 pass
 
         if not self.work_api:
-            if work_api is None:
+            if arvargs.work_api is None:
                 raise Exception("No supported APIs")
             else:
                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
 
-    def arv_make_tool(self, toolpath_object, **kwargs):
-        kwargs["work_api"] = self.work_api
-        kwargs["fetcher_constructor"] = partial(CollectionFetcher,
-                                                api_client=self.api,
-                                                fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
-                                                num_retries=self.num_retries)
-        kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
+        self.loadingContext = ArvLoadingContext(vars(arvargs))
+        self.loadingContext.fetcher_constructor = self.fetcher_constructor
+        self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
+        self.loadingContext.construct_tool_object = self.arv_make_tool
+
+
+    def arv_make_tool(self, toolpath_object, loadingContext):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
-            return ArvadosCommandTool(self, toolpath_object, **kwargs)
+            return ArvadosCommandTool(self, toolpath_object, loadingContext)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
-            return ArvadosWorkflow(self, toolpath_object, **kwargs)
+            return ArvadosWorkflow(self, toolpath_object, loadingContext)
         else:
-            return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
+            return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
 
     def output_callback(self, out, processStatus):
-        if processStatus == "success":
-            logger.info("Overall process status is %s", processStatus)
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Complete"}).execute(num_retries=self.num_retries)
-        else:
-            logger.warn("Overall process status is %s", processStatus)
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
-        self.final_status = processStatus
-        self.final_output = out
+        with self.workflow_eval_lock:
+            if processStatus == "success":
+                logger.info("Overall process status is %s", processStatus)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Complete"}).execute(num_retries=self.num_retries)
+            else:
+                logger.error("Overall process status is %s", processStatus)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            self.final_status = processStatus
+            self.final_output = out
+            self.workflow_eval_lock.notifyAll()
+
+
+    def start_run(self, runnable, runtimeContext):
+        self.task_queue.add(partial(runnable.run, runtimeContext))
+
+    def process_submitted(self, container):
+        with self.workflow_eval_lock:
+            self.processes[container.uuid] = container
+
+    def process_done(self, uuid, record):
+        with self.workflow_eval_lock:
+            j = self.processes[uuid]
+            logger.info("%s %s is %s", self.label(j), uuid, record["state"])
+            self.task_queue.add(partial(j.done, record))
+            del self.processes[uuid]
+
+    def wrapped_callback(self, cb, obj, st):
+        with self.workflow_eval_lock:
+            cb(obj, st)
+            self.workflow_eval_lock.notifyAll()
+
+    def get_wrapped_callback(self, cb):
+        return partial(self.wrapped_callback, cb)
 
     def on_message(self, event):
-        if "object_uuid" in event:
-            if event["object_uuid"] in self.processes and event["event_type"] == "update":
-                if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
-                    uuid = event["object_uuid"]
-                    with self.lock:
-                        j = self.processes[uuid]
-                        logger.info("%s %s is Running", self.label(j), uuid)
+        if event.get("object_uuid") in self.processes and event["event_type"] == "update":
+            uuid = event["object_uuid"]
+            if event["properties"]["new_attributes"]["state"] == "Running":
+                with self.workflow_eval_lock:
+                    j = self.processes[uuid]
+                    if j.running is False:
                         j.running = True
                         j.update_pipeline_component(event["properties"]["new_attributes"])
-                elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
-                    uuid = event["object_uuid"]
-                    try:
-                        self.cond.acquire()
-                        j = self.processes[uuid]
-                        logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
-                        with Perf(metrics, "done %s" % j.name):
-                            j.done(event["properties"]["new_attributes"])
-                        self.cond.notify()
-                    finally:
-                        self.cond.release()
+                        logger.info("%s %s is Running", self.label(j), uuid)
+            elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
+                self.process_done(uuid, event["properties"]["new_attributes"])
 
     def label(self, obj):
         return "[%s %s]" % (self.work_api[0:-1], obj.name)
@@ -165,15 +211,19 @@ class ArvCwlRunner(object):
         """
 
         try:
+            remain_wait = self.poll_interval
             while True:
-                self.stop_polling.wait(15)
+                if remain_wait > 0:
+                    self.stop_polling.wait(remain_wait)
                 if self.stop_polling.is_set():
                     break
-                with self.lock:
-                    keys = self.processes.keys()
+                with self.workflow_eval_lock:
+                    keys = list(self.processes.keys())
                 if not keys:
+                    remain_wait = self.poll_interval
                     continue
 
+                begin_poll = time.time()
                 if self.work_api == "containers":
                     table = self.poll_api.container_requests()
                 elif self.work_api == "jobs":
@@ -183,6 +233,7 @@ class ArvCwlRunner(object):
                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
                 except Exception as e:
                     logger.warn("Error checking states on API server: %s", e)
+                    remain_wait = self.poll_interval
                     continue
 
                 for p in proc_states["items"]:
@@ -193,21 +244,16 @@ class ArvCwlRunner(object):
                             "new_attributes": p
                         }
                     })
+                finish_poll = time.time()
+                remain_wait = self.poll_interval - (finish_poll - begin_poll)
         except:
-            logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
-            self.cond.acquire()
-            self.processes.clear()
-            self.cond.notify()
-            self.cond.release()
+            logger.exception("Fatal error in state polling thread.")
+            with self.workflow_eval_lock:
+                self.processes.clear()
+                self.workflow_eval_lock.notifyAll()
         finally:
             self.stop_polling.set()
 
-    def get_uploaded(self):
-        return self.uploaded.copy()
-
-    def add_uploaded(self, src, pair):
-        self.uploaded[src] = pair
-
     def add_intermediate_output(self, uuid):
         if uuid:
             self.intermediate_output_collections.append(uuid)
@@ -219,7 +265,7 @@ class ArvCwlRunner(object):
                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
             except:
                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
-            if sys.exc_info()[0] is KeyboardInterrupt:
+            if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
                 break
 
     def check_features(self, obj):
@@ -234,6 +280,8 @@ class ArvCwlRunner(object):
                     if not obj.get("dockerOutputDirectory").startswith('/'):
                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
                             "Option 'dockerOutputDirectory' must be an absolute path.")
+            if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
+                raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
             for v in obj.itervalues():
                 self.check_features(v)
         elif isinstance(obj, list):
@@ -241,7 +289,7 @@ class ArvCwlRunner(object):
                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
                     self.check_features(v)
 
-    def make_output_collection(self, name, tagsString, outputObj):
+    def make_output_collection(self, name, storage_classes, tagsString, outputObj):
         outputObj = copy.deepcopy(outputObj)
 
         files = []
@@ -292,7 +340,7 @@ class ArvCwlRunner(object):
         with final.open("cwl.output.json", "w") as f:
             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
 
-        final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
+        final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
 
         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
                     final.api_response()["name"],
@@ -341,30 +389,31 @@ class ArvCwlRunner(object):
                                        'progress':1.0
                                    }).execute(num_retries=self.num_retries)
 
-    def arv_executor(self, tool, job_order, **kwargs):
-        self.debug = kwargs.get("debug")
+    def arv_executor(self, tool, job_order, runtimeContext, logger=None):
+        self.debug = runtimeContext.debug
 
         tool.visit(self.check_features)
 
-        self.project_uuid = kwargs.get("project_uuid")
+        self.project_uuid = runtimeContext.project_uuid
         self.pipeline = None
-        make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
-                                                                 collection_cache=self.collection_cache)
-        self.fs_access = make_fs_access(kwargs["basedir"])
+        self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
+        self.secret_store = runtimeContext.secret_store
 
-
-        self.trash_intermediate = kwargs["trash_intermediate"]
+        self.trash_intermediate = runtimeContext.trash_intermediate
         if self.trash_intermediate and self.work_api != "containers":
             raise Exception("--trash-intermediate is only supported with --api=containers.")
 
-        self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
+        self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
         if self.intermediate_output_ttl and self.work_api != "containers":
             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
         if self.intermediate_output_ttl < 0:
             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
 
-        if not kwargs.get("name"):
-            kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+        if runtimeContext.submit_request_uuid and self.work_api != "containers":
+            raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
+
+        if not runtimeContext.name:
+            runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
 
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
@@ -372,25 +421,29 @@ class ArvCwlRunner(object):
 
         # Reload tool object which may have been updated by
         # upload_workflow_deps
+        # Don't validate this time because it will just print redundant errors.
+        loadingContext = self.loadingContext.copy()
+        loadingContext.loader = tool.doc_loader
+        loadingContext.avsc_names = tool.doc_schema
+        loadingContext.metadata = tool.metadata
+        loadingContext.do_validate = False
+
         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
-                                  makeTool=self.arv_make_tool,
-                                  loader=tool.doc_loader,
-                                  avsc_names=tool.doc_schema,
-                                  metadata=tool.metadata)
+                                  loadingContext)
 
         # Upload local file references in the job order.
-        job_order = upload_job_order(self, "%s input" % kwargs["name"],
+        job_order = upload_job_order(self, "%s input" % runtimeContext.name,
                                      tool, job_order)
 
-        existing_uuid = kwargs.get("update_workflow")
-        if existing_uuid or kwargs.get("create_workflow"):
+        existing_uuid = runtimeContext.update_workflow
+        if existing_uuid or runtimeContext.create_workflow:
             # Create a pipeline template or workflow record and exit.
             if self.work_api == "jobs":
                 tmpl = RunnerTemplate(self, tool, job_order,
-                                      kwargs.get("enable_reuse"),
+                                      runtimeContext.enable_reuse,
                                       uuid=existing_uuid,
-                                      submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                      name=kwargs["name"],
+                                      submit_runner_ram=runtimeContext.submit_runner_ram,
+                                      name=runtimeContext.name,
                                       merged_map=merged_map)
                 tmpl.save()
                 # cwltool.main will write our return value to stdout.
@@ -399,92 +452,103 @@ class ArvCwlRunner(object):
                 return (upload_workflow(self, tool, job_order,
                                         self.project_uuid,
                                         uuid=existing_uuid,
-                                        submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                        name=kwargs["name"],
+                                        submit_runner_ram=runtimeContext.submit_runner_ram,
+                                        name=runtimeContext.name,
                                         merged_map=merged_map),
                         "success")
 
-        self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
-        self.eval_timeout = kwargs.get("eval_timeout")
+        self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
+        self.eval_timeout = runtimeContext.eval_timeout
 
-        kwargs["make_fs_access"] = make_fs_access
-        kwargs["enable_reuse"] = kwargs.get("enable_reuse")
-        kwargs["use_container"] = True
-        kwargs["tmpdir_prefix"] = "tmp"
-        kwargs["compute_checksum"] = kwargs.get("compute_checksum")
+        runtimeContext = runtimeContext.copy()
+        runtimeContext.use_container = True
+        runtimeContext.tmpdir_prefix = "tmp"
+        runtimeContext.work_api = self.work_api
 
         if self.work_api == "containers":
             if self.ignore_docker_for_reuse:
-                raise validate.ValidationException("--ignore-docker-for-reuse not supported with containers API.")
-            kwargs["outdir"] = "/var/spool/cwl"
-            kwargs["docker_outdir"] = "/var/spool/cwl"
-            kwargs["tmpdir"] = "/tmp"
-            kwargs["docker_tmpdir"] = "/tmp"
+                raise Exception("--ignore-docker-for-reuse not supported with containers API.")
+            runtimeContext.outdir = "/var/spool/cwl"
+            runtimeContext.docker_outdir = "/var/spool/cwl"
+            runtimeContext.tmpdir = "/tmp"
+            runtimeContext.docker_tmpdir = "/tmp"
         elif self.work_api == "jobs":
-            kwargs["outdir"] = "$(task.outdir)"
-            kwargs["docker_outdir"] = "$(task.outdir)"
-            kwargs["tmpdir"] = "$(task.tmpdir)"
+            if runtimeContext.priority != DEFAULT_PRIORITY:
+                raise Exception("--priority not implemented for jobs API.")
+            runtimeContext.outdir = "$(task.outdir)"
+            runtimeContext.docker_outdir = "$(task.outdir)"
+            runtimeContext.tmpdir = "$(task.tmpdir)"
+
+        if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
+            raise Exception("--priority must be in the range 1..1000.")
 
         runnerjob = None
-        if kwargs.get("submit"):
+        if runtimeContext.submit:
             # Submit a runner job to run the workflow for us.
             if self.work_api == "containers":
-                if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
-                    kwargs["runnerjob"] = tool.tool["id"]
+                if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
+                    runtimeContext.runnerjob = tool.tool["id"]
                     runnerjob = tool.job(job_order,
                                          self.output_callback,
-                                         **kwargs).next()
+                                         runtimeContext).next()
                 else:
-                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
+                    runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
                                                 self.output_name,
                                                 self.output_tags,
-                                                submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                                name=kwargs.get("name"),
-                                                on_error=kwargs.get("on_error"),
-                                                submit_runner_image=kwargs.get("submit_runner_image"),
-                                                intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
-                                                merged_map=merged_map)
+                                                submit_runner_ram=runtimeContext.submit_runner_ram,
+                                                name=runtimeContext.name,
+                                                on_error=runtimeContext.on_error,
+                                                submit_runner_image=runtimeContext.submit_runner_image,
+                                                intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
+                                                merged_map=merged_map,
+                                                priority=runtimeContext.priority,
+                                                secret_store=self.secret_store)
             elif self.work_api == "jobs":
-                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
+                runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
                                       self.output_name,
                                       self.output_tags,
-                                      submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                      name=kwargs.get("name"),
-                                      on_error=kwargs.get("on_error"),
-                                      submit_runner_image=kwargs.get("submit_runner_image"),
+                                      submit_runner_ram=runtimeContext.submit_runner_ram,
+                                      name=runtimeContext.name,
+                                      on_error=runtimeContext.on_error,
+                                      submit_runner_image=runtimeContext.submit_runner_image,
                                       merged_map=merged_map)
-        elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
+        elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
                 body={
                     "owner_uuid": self.project_uuid,
-                    "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
+                    "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
                     "components": {},
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
-        if runnerjob and not kwargs.get("wait"):
-            runnerjob.run(wait=kwargs.get("wait"))
+        if runnerjob and not runtimeContext.wait:
+            submitargs = runtimeContext.copy()
+            submitargs.submit = False
+            runnerjob.run(submitargs)
             return (runnerjob.uuid, "success")
 
         self.poll_api = arvados.api('v1')
         self.polling_thread = threading.Thread(target=self.poll_states)
         self.polling_thread.start()
 
+        self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
+
         if runnerjob:
             jobiter = iter((runnerjob,))
         else:
-            if "cwl_runner_job" in kwargs:
-                self.uuid = kwargs.get("cwl_runner_job").get('uuid')
+            if runtimeContext.cwl_runner_job is not None:
+                self.uuid = runtimeContext.cwl_runner_job.get('uuid')
             jobiter = tool.job(job_order,
                                self.output_callback,
-                               **kwargs)
+                               runtimeContext)
 
         try:
-            self.cond.acquire()
-            # Will continue to hold the lock for the duration of this code
-            # except when in cond.wait(), at which point on_message can update
-            # job state and process output callbacks.
+            self.workflow_eval_lock.acquire()
+            # Holds the lock while this code runs and releases it when
+            # it is safe to do so in self.workflow_eval_lock.wait(),
+            # at which point on_message can update job state and
+            # process output callbacks.
 
             loopperf = Perf(metrics, "jobiter")
             loopperf.__enter__()
@@ -494,26 +558,31 @@ class ArvCwlRunner(object):
                 if self.stop_polling.is_set():
                     break
 
+                if self.task_queue.error is not None:
+                    raise self.task_queue.error
+
                 if runnable:
                     with Perf(metrics, "run"):
-                        runnable.run(**kwargs)
+                        self.start_run(runnable, runtimeContext)
                 else:
-                    if self.processes:
-                        self.cond.wait(1)
+                    if (self.task_queue.in_flight + len(self.processes)) > 0:
+                        self.workflow_eval_lock.wait(3)
                     else:
-                        logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+                        logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
                         break
                 loopperf.__enter__()
             loopperf.__exit__()
 
-            while self.processes:
-                self.cond.wait(1)
+            while (self.task_queue.in_flight + len(self.processes)) > 0:
+                if self.task_queue.error is not None:
+                    raise self.task_queue.error
+                self.workflow_eval_lock.wait(3)
 
         except UnsupportedRequirement:
             raise
         except:
-            if sys.exc_info()[0] is KeyboardInterrupt:
-                logger.error("Interrupted, marking pipeline as failed")
+            if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+                logger.error("Interrupted, workflow will be cancelled")
             else:
                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             if self.pipeline:
@@ -523,9 +592,11 @@ class ArvCwlRunner(object):
                 self.api.container_requests().update(uuid=runnerjob.uuid,
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
-            self.cond.release()
+            self.workflow_eval_lock.release()
+            self.task_queue.drain()
             self.stop_polling.set()
             self.polling_thread.join()
+            self.task_queue.join()
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
@@ -533,17 +604,19 @@ class ArvCwlRunner(object):
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
-        if kwargs.get("submit") and isinstance(runnerjob, Runner):
+        if runtimeContext.submit and isinstance(runnerjob, Runner):
             logger.info("Final output collection %s", runnerjob.final_output)
         else:
             if self.output_name is None:
                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
             if self.output_tags is None:
                 self.output_tags = ""
-            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
+
+            storage_classes = runtimeContext.storage_classes.strip().split(",")
+            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
             self.set_crunch_output()
 
-        if kwargs.get("compute_checksum"):
+        if runtimeContext.compute_checksum:
             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
@@ -581,7 +654,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--print-dot", action="store_true",
                          help="Print workflow visualization in graphviz format and exit")
-    exgroup.add_argument("--version", action="store_true", help="Print version and exit")
+    exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
 
     exgroup = parser.add_mutually_exclusive_group()
@@ -641,12 +714,16 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 
     parser.add_argument("--submit-runner-ram", type=int,
                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
-                        default=1024)
+                        default=None)
 
     parser.add_argument("--submit-runner-image", type=str,
                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
                         default=None)
 
+    parser.add_argument("--submit-request-uuid", type=str,
+                        default=None,
+                        help="Update and commit supplied container request instead of creating a new one (containers API only).")
+
     parser.add_argument("--name", type=str,
                         help="Name to use for workflow execution instance.",
                         default=None)
@@ -658,11 +735,28 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     parser.add_argument("--enable-dev", action="store_true",
                         help="Enable loading and running development versions "
                              "of CWL spec.", default=False)
+    parser.add_argument('--storage-classes', default="default", type=str,
+                        help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
 
     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
                         default=0)
 
+    parser.add_argument("--priority", type=int,
+                        help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
+                        default=DEFAULT_PRIORITY)
+
+    parser.add_argument("--disable-validate", dest="do_validate",
+                        action="store_false", default=True,
+                        help=argparse.SUPPRESS)
+
+    parser.add_argument("--disable-js-validation",
+                        action="store_true", default=False,
+                        help=argparse.SUPPRESS)
+
+    parser.add_argument("--thread-count", type=int,
+                        default=4, help="Number of threads to use for job submit and output collection.")
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--trash-intermediate", action="store_true",
                         default=False, dest="trash_intermediate",
@@ -671,14 +765,14 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=False, dest="trash_intermediate",
                         help="Do not trash intermediate outputs (default).")
 
-    parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
+    parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
     return parser
 
 def add_arv_hints():
-    cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
-    cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
+    cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
+    cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
     res.close()
@@ -693,15 +787,27 @@ def add_arv_hints():
         "http://arvados.org/cwl#ReuseRequirement"
     ])
 
-def main(args, stdout, stderr, api_client=None, keep_client=None):
+def exit_signal_handler(sigcode, frame):
+    logger.error("Caught signal {}, exiting.".format(sigcode))
+    sys.exit(-sigcode)
+
+def main(args, stdout, stderr, api_client=None, keep_client=None,
+         install_sig_handlers=True):
     parser = arg_parser()
 
     job_order_object = None
     arvargs = parser.parse_args(args)
 
-    if arvargs.version:
-        print versionstring()
-        return
+    if len(arvargs.storage_classes.strip().split(',')) > 1:
+        logger.error("Multiple storage classes are not supported currently.")
+        return 1
+
+    arvargs.use_container = True
+    arvargs.relax_path_checks = True
+    arvargs.print_supported_versions = False
+
+    if install_sig_handlers:
+        arv_cmd.install_signal_handlers()
 
     if arvargs.update_workflow:
         if arvargs.update_workflow.find('-7fd4e-') == 5:
@@ -723,12 +829,13 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     try:
         if api_client is None:
-            api_client=arvados.api('v1', model=OrderedJsonModel())
+            api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
+            keep_client = api_client.keep
+            # Make an API object now so errors are reported early.
+            api_client.users().current().execute()
         if keep_client is None:
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
-        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
-                              num_retries=4, output_name=arvargs.output_name,
-                              output_tags=arvargs.output_tags)
+        runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
     except Exception as e:
         logger.error(e)
         return 1
@@ -753,26 +860,21 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     else:
         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
 
-    arvargs.conformance_test = None
-    arvargs.use_container = True
-    arvargs.relax_path_checks = True
-    arvargs.print_supported_versions = False
+    for key, val in cwltool.argparser.get_default_args().items():
+        if not hasattr(arvargs, key):
+            setattr(arvargs, key, val)
 
-    make_fs_access = partial(CollectionFsAccess,
-                           collection_cache=runner.collection_cache)
+    runtimeContext = ArvRuntimeContext(vars(arvargs))
+    runtimeContext.make_fs_access = partial(CollectionFsAccess,
+                             collection_cache=runner.collection_cache)
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
                              executor=runner.arv_executor,
-                             makeTool=runner.arv_make_tool,
                              versionfunc=versionstring,
                              job_order_object=job_order_object,
-                             make_fs_access=make_fs_access,
-                             fetcher_constructor=partial(CollectionFetcher,
-                                                         api_client=api_client,
-                                                         fs_access=make_fs_access(""),
-                                                         num_retries=runner.num_retries),
-                             resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
                              logger_handler=arvados.log_handler,
-                             custom_schema_callback=add_arv_hints)
+                             custom_schema_callback=add_arv_hints,
+                             loadingContext=runner.loadingContext,
+                             runtimeContext=runtimeContext)