100032: Add metrics to job submission in --debug mode.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 533d24a6981177e2a5d6ba916b367fd0bb1a2e73..7bfdba8b80e48d9cb24d7054933b428aa3729764 100644 (file)
 #!/usr/bin/env python
 
+# Implement cwl-runner interface for submitting and running work on Arvados, using
+# either the Crunch jobs API or Crunch containers API.
+
 import argparse
-import arvados
-import arvados.events
-import arvados.commands.keepdocker
-import arvados.commands.run
-import arvados.collection
-import arvados.util
-import cwltool.draft2tool
-import cwltool.workflow
-import cwltool.main
-from cwltool.process import shortname
-import threading
-import cwltool.docker
-import fnmatch
 import logging
-import re
 import os
 import sys
+import threading
+import hashlib
+from functools import partial
+import pkg_resources  # part of setuptools
+
+from cwltool.errors import WorkflowException
+import cwltool.main
+import cwltool.workflow
 
-from cwltool.process import get_feature
+import arvados
+import arvados.config
+
+from .arvcontainer import ArvadosContainer, RunnerContainer
+from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
+from .arvtool import ArvadosCommandTool
+from .fsaccess import CollectionFsAccess
+from .arvworkflow import make_workflow
+from .perf import Perf
+
+from cwltool.process import shortname, UnsupportedRequirement
+from cwltool.pathmapper import adjustFileObjs
+from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
 logger.setLevel(logging.INFO)
 
-crunchrunner_pdh = "83db29f08544e1c319572a6bd971088a+140"
-crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
-certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
-
-tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
-outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
-keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
-
-
-def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
-    if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
-        dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
-
-    sp = dockerRequirement["dockerImageId"].split(":")
-    image_name = sp[0]
-    image_tag = sp[1] if len(sp) > 1 else None
-
-    images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
-                                                            image_name=image_name,
-                                                            image_tag=image_tag)
-
-    if not images:
-        imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
-        args = ["--project-uuid="+project_uuid, image_name]
-        if image_tag:
-            args.append(image_tag)
-        logger.info("Uploading Docker image %s", ":".join(args[1:]))
-        arvados.commands.keepdocker.main(args)
-
-    return dockerRequirement["dockerImageId"]
-
-
-class CollectionFsAccess(cwltool.process.StdFsAccess):
-    def __init__(self, basedir):
-        self.collections = {}
-        self.basedir = basedir
-
-    def get_collection(self, path):
-        p = path.split("/")
-        if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
-            pdh = p[0][5:]
-            if pdh not in self.collections:
-                self.collections[pdh] = arvados.collection.CollectionReader(pdh)
-            return (self.collections[pdh], "/".join(p[1:]))
-        else:
-            return (None, path)
-
-    def _match(self, collection, patternsegments, parent):
-        if not patternsegments:
-            return []
-
-        if not isinstance(collection, arvados.collection.RichCollectionBase):
-            return []
-
-        ret = []
-        # iterate over the files and subcollections in 'collection'
-        for filename in collection:
-            if patternsegments[0] == '.':
-                # Pattern contains something like "./foo" so just shift
-                # past the "./"
-                ret.extend(self._match(collection, patternsegments[1:], parent))
-            elif fnmatch.fnmatch(filename, patternsegments[0]):
-                cur = os.path.join(parent, filename)
-                if len(patternsegments) == 1:
-                    ret.append(cur)
-                else:
-                    ret.extend(self._match(collection[filename], patternsegments[1:], cur))
-        return ret
-
-    def glob(self, pattern):
-        collection, rest = self.get_collection(pattern)
-        patternsegments = rest.split("/")
-        return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
-
-    def open(self, fn, mode):
-        collection, rest = self.get_collection(fn)
-        if collection:
-            return collection.open(rest, mode)
-        else:
-            return open(self._abs(fn), mode)
-
-    def exists(self, fn):
-        collection, rest = self.get_collection(fn)
-        if collection:
-            return collection.exists(rest)
-        else:
-            return os.path.exists(self._abs(fn))
-
-class ArvadosJob(object):
-    def __init__(self, runner):
-        self.arvrunner = runner
-        self.running = False
-
-    def run(self, dry_run=False, pull_image=True, **kwargs):
-        script_parameters = {
-            "command": self.command_line
-        }
-        runtime_constraints = {}
-
-        if self.generatefiles:
-            vwd = arvados.collection.Collection()
-            script_parameters["task.vwd"] = {}
-            for t in self.generatefiles:
-                if isinstance(self.generatefiles[t], dict):
-                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
-                    vwd.copy(rest, t, source_collection=src)
-                else:
-                    with vwd.open(t, "w") as f:
-                        f.write(self.generatefiles[t])
-            vwd.save_new()
-            for t in self.generatefiles:
-                script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
-
-        script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
-        if self.environment:
-            script_parameters["task.env"].update(self.environment)
-
-        if self.stdin:
-            script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
-
-        if self.stdout:
-            script_parameters["task.stdout"] = self.stdout
-
-        (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
-        if docker_req and kwargs.get("use_container") is not False:
-            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
-
-        resources = self.builder.resources
-        if resources is not None:
-            runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
-            runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
-            runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
-
-        try:
-            response = self.arvrunner.api.jobs().create(body={
-                "owner_uuid": self.arvrunner.project_uuid,
-                "script": "crunchrunner",
-                "repository": "arvados",
-                "script_version": "master",
-                "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
-                "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
-                "runtime_constraints": runtime_constraints
-            }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
-
-            self.arvrunner.jobs[response["uuid"]] = self
-
-            self.arvrunner.pipeline["components"][self.name] = {"job": response}
-            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
-                                                                                     body={
-                                                                                         "components": self.arvrunner.pipeline["components"]
-                                                                                     }).execute(num_retries=self.arvrunner.num_retries)
-
-            logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
-
-            if response["state"] in ("Complete", "Failed", "Cancelled"):
-                self.done(response)
-        except Exception as e:
-            logger.error("Got error %s" % str(e))
-            self.output_callback({}, "permanentFail")
-
-    def update_pipeline_component(self, record):
-        self.arvrunner.pipeline["components"][self.name] = {"job": record}
-        self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
-                                                                                 body={
-                                                                                    "components": self.arvrunner.pipeline["components"]
-                                                                                 }).execute(num_retries=self.arvrunner.num_retries)
-
-    def done(self, record):
-        try:
-            self.update_pipeline_component(record)
-        except:
-            pass
-
-        try:
-            if record["state"] == "Complete":
-                processStatus = "success"
-            else:
-                processStatus = "permanentFail"
-
-            try:
-                outputs = {}
-                if record["output"]:
-                    logc = arvados.collection.Collection(record["log"])
-                    log = logc.open(logc.keys()[0])
-                    tmpdir = None
-                    outdir = None
-                    keepdir = None
-                    for l in log.readlines():
-                        g = tmpdirre.match(l)
-                        if g:
-                            tmpdir = g.group(1)
-                        g = outdirre.match(l)
-                        if g:
-                            outdir = g.group(1)
-                        g = keepre.match(l)
-                        if g:
-                            keepdir = g.group(1)
-                        if tmpdir and outdir and keepdir:
-                            break
-
-                    self.builder.outdir = outdir
-                    self.builder.pathmapper.keepdir = keepdir
-                    outputs = self.collect_outputs("keep:" + record["output"])
-            except Exception as e:
-                logger.exception("Got exception while collecting job outputs:")
-                processStatus = "permanentFail"
-
-            self.output_callback(outputs, processStatus)
-        finally:
-            del self.arvrunner.jobs[record["uuid"]]
-
-
-class ArvPathMapper(cwltool.pathmapper.PathMapper):
-    def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
-        self._pathmap = arvrunner.get_uploaded()
-        uploadfiles = []
-
-        pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
-
-        for src in referenced_files:
-            if isinstance(src, basestring) and pdh_path.match(src):
-                self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
-            if src not in self._pathmap:
-                ab = cwltool.pathmapper.abspath(src, basedir)
-                st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
-                if kwargs.get("conformance_test"):
-                    self._pathmap[src] = (src, ab)
-                elif isinstance(st, arvados.commands.run.UploadFile):
-                    uploadfiles.append((src, ab, st))
-                elif isinstance(st, arvados.commands.run.ArvFile):
-                    self._pathmap[src] = (ab, st.fn)
-                else:
-                    raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
-
-        if uploadfiles:
-            arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
-                                             arvrunner.api,
-                                             dry_run=kwargs.get("dry_run"),
-                                             num_retries=3,
-                                             fnPattern="$(task.keep)/%s/%s",
-                                             project=arvrunner.project_uuid)
-
-        for src, ab, st in uploadfiles:
-            arvrunner.add_uploaded(src, (ab, st.fn))
-            self._pathmap[src] = (ab, st.fn)
-
-        self.keepdir = None
-
-    def reversemap(self, target):
-        if target.startswith("keep:"):
-            return (target, target)
-        elif self.keepdir and target.startswith(self.keepdir):
-            return (target, "keep:" + target[len(self.keepdir)+1:])
-        else:
-            return super(ArvPathMapper, self).reversemap(target)
-
-
-class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
-    def __init__(self, arvrunner, toolpath_object, **kwargs):
-        super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
-        self.arvrunner = arvrunner
-
-    def makeJobRunner(self):
-        return ArvadosJob(self.arvrunner)
-
-    def makePathMapper(self, reffiles, input_basedir, **kwargs):
-        return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
+class ArvCwlRunner(object):
+    """Execute a CWL tool or workflow, submit work (using either jobs or
+    containers API), wait for them to complete, and report output.
 
+    """
 
-class ArvCwlRunner(object):
-    def __init__(self, api_client):
+    def __init__(self, api_client, work_api=None):
         self.api = api_client
-        self.jobs = {}
+        self.processes = {}
         self.lock = threading.Lock()
         self.cond = threading.Condition(self.lock)
         self.final_output = None
+        self.final_status = None
         self.uploaded = {}
         self.num_retries = 4
+        self.uuid = None
+        self.work_api = work_api
+        self.stop_polling = threading.Event()
+        self.poll_api = None
+
+        if self.work_api is None:
+            # todo: autodetect API to use.
+            self.work_api = "jobs"
+
+        if self.work_api not in ("containers", "jobs"):
+            raise Exception("Unsupported API '%s'" % self.work_api)
 
     def arvMakeTool(self, toolpath_object, **kwargs):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
+            kwargs["work_api"] = self.work_api
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
         else:
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
     def output_callback(self, out, processStatus):
         if processStatus == "success":
-            logger.info("Overall job status is %s", processStatus)
-            self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                 body={"state": "Complete"}).execute(num_retries=self.num_retries)
-
+            logger.info("Overall process status is %s", processStatus)
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Complete"}).execute(num_retries=self.num_retries)
         else:
-            logger.warn("Overall job status is %s", processStatus)
-            self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                 body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            logger.warn("Overall process status is %s", processStatus)
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
+        self.final_status = processStatus
         self.final_output = out
 
-
     def on_message(self, event):
         if "object_uuid" in event:
-            if event["object_uuid"] in self.jobs and event["event_type"] == "update":
-                if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
+            if event["object_uuid"] in self.processes and event["event_type"] == "update":
+                if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
                     uuid = event["object_uuid"]
                     with self.lock:
-                        j = self.jobs[uuid]
+                        j = self.processes[uuid]
                         logger.info("Job %s (%s) is Running", j.name, uuid)
                         j.running = True
                         j.update_pipeline_component(event["properties"]["new_attributes"])
@@ -337,107 +96,236 @@ class ArvCwlRunner(object):
                     uuid = event["object_uuid"]
                     try:
                         self.cond.acquire()
-                        j = self.jobs[uuid]
+                        j = self.processes[uuid]
                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
-                        j.done(event["properties"]["new_attributes"])
+                        with Perf(logger, "done %s" % j.name):
+                            j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
                     finally:
                         self.cond.release()
 
+    def poll_states(self):
+        """Poll status of jobs or containers listed in the processes dict.
+
+        Runs in a separate thread.
+        """
+
+        while True:
+            self.stop_polling.wait(15)
+            if self.stop_polling.is_set():
+                break
+            with self.lock:
+                keys = self.processes.keys()
+            if not keys:
+                continue
+
+            if self.work_api == "containers":
+                table = self.poll_api.containers()
+            elif self.work_api == "jobs":
+                table = self.poll_api.jobs()
+
+            try:
+                proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+            except Exception as e:
+                logger.warn("Error checking states on API server: %s", e)
+                continue
+
+            for p in proc_states["items"]:
+                self.on_message({
+                    "object_uuid": p["uuid"],
+                    "event_type": "update",
+                    "properties": {
+                        "new_attributes": p
+                    }
+                })
+
     def get_uploaded(self):
         return self.uploaded.copy()
 
     def add_uploaded(self, src, pair):
         self.uploaded[src] = pair
 
-    def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
-        events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
-
-        try:
-            self.api.collections().get(uuid=crunchrunner_pdh).execute()
-        except arvados.errors.ApiError as e:
-            import httplib2
-            h = httplib2.Http(ca_certs=arvados.util.ca_certs_path())
-            resp, content = h.request(crunchrunner_download, "GET")
-            resp2, content2 = h.request(certs_download, "GET")
-            with arvados.collection.Collection() as col:
-                with col.open("crunchrunner", "w") as f:
-                    f.write(content)
-                with col.open("ca-certificates.crt", "w") as f:
-                    f.write(content2)
+    def check_writable(self, obj):
+        if isinstance(obj, dict):
+            if obj.get("writable"):
+                raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
+            for v in obj.itervalues():
+                self.check_writable(v)
+        if isinstance(obj, list):
+            for v in obj:
+                self.check_writable(v)
 
-                col.save_new("crunchrunner binary", ensure_unique_name=True)
+    def arvExecutor(self, tool, job_order, **kwargs):
+        self.debug = kwargs.get("debug")
 
-        self.fs_access = CollectionFsAccess(input_basedir)
+        tool.visit(self.check_writable)
 
-        kwargs["fs_access"] = self.fs_access
-        kwargs["enable_reuse"] = args.enable_reuse
+        if kwargs.get("quiet"):
+            logger.setLevel(logging.WARN)
+            logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
-        kwargs["outdir"] = "$(task.outdir)"
-        kwargs["tmpdir"] = "$(task.tmpdir)"
+        if self.debug:
+            logger.setLevel(logging.DEBUG)
 
         useruuid = self.api.users().current().execute()["uuid"]
-        self.project_uuid = args.project_uuid if args.project_uuid else useruuid
+        self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
+        self.pipeline = None
+        make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
+        self.fs_access = make_fs_access(kwargs["basedir"])
+
+        if kwargs.get("create_template"):
+            tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
+            tmpl.save()
+            # cwltool.main will write our return value to stdout.
+            return tmpl.uuid
+
+        if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
+            return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+
+        self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
+
+        kwargs["make_fs_access"] = make_fs_access
+        kwargs["enable_reuse"] = kwargs.get("enable_reuse")
+        kwargs["use_container"] = True
+        kwargs["tmpdir_prefix"] = "tmp"
+        kwargs["on_error"] = "continue"
+        kwargs["compute_checksum"] = kwargs.get("compute_checksum")
+
+        if self.work_api == "containers":
+            kwargs["outdir"] = "/var/spool/cwl"
+            kwargs["docker_outdir"] = "/var/spool/cwl"
+            kwargs["tmpdir"] = "/tmp"
+            kwargs["docker_tmpdir"] = "/tmp"
+        elif self.work_api == "jobs":
+            kwargs["outdir"] = "$(task.outdir)"
+            kwargs["docker_outdir"] = "$(task.outdir)"
+            kwargs["tmpdir"] = "$(task.tmpdir)"
+
+        runnerjob = None
+        if kwargs.get("submit"):
+            if self.work_api == "containers":
+                if tool.tool["class"] == "CommandLineTool":
+                    runnerjob = tool.job(job_order,
+                                         self.output_callback,
+                                         **kwargs).next()
+                else:
+                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
+            else:
+                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
 
-        if kwargs.get("conformance_test"):
-            return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
-        else:
+        if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
+            # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
                 body={
                     "owner_uuid": self.project_uuid,
                     "name": shortname(tool.tool["id"]),
                     "components": {},
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
-
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
+        if runnerjob and not kwargs.get("wait"):
+            runnerjob.run()
+            return runnerjob.uuid
+
+        self.poll_api = arvados.api('v1')
+        self.polling_thread = threading.Thread(target=self.poll_states)
+        self.polling_thread.start()
+
+        if runnerjob:
+            jobiter = iter((runnerjob,))
+        else:
+            if "cwl_runner_job" in kwargs:
+                self.uuid = kwargs.get("cwl_runner_job").get('uuid')
             jobiter = tool.job(job_order,
-                               input_basedir,
                                self.output_callback,
-                               docker_outdir="$(task.outdir)",
                                **kwargs)
 
-            try:
-                self.cond.acquire()
-                # Will continue to hold the lock for the duration of this code
-                # except when in cond.wait(), at which point on_message can update
-                # job state and process output callbacks.
-
-                for runnable in jobiter:
-                    if runnable:
+        try:
+            self.cond.acquire()
+            # Will continue to hold the lock for the duration of this code
+            # except when in cond.wait(), at which point on_message can update
+            # job state and process output callbacks.
+
+            for runnable in jobiter:
+                if runnable:
+                    with Perf(logger, "run"):
                         runnable.run(**kwargs)
+                else:
+                    if self.processes:
+                        self.cond.wait(1)
                     else:
-                        if self.jobs:
-                            self.cond.wait(1)
-                        else:
-                            logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
-                            break
-
-                while self.jobs:
-                    self.cond.wait(1)
+                        logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+                        break
 
-                events.close()
+            while self.processes:
+                self.cond.wait(1)
 
-                if self.final_output is None:
-                    raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
-
-                # create final output collection
-            except:
-                if sys.exc_info()[0] is KeyboardInterrupt:
-                    logger.error("Interrupted, marking pipeline as failed")
-                else:
-                    logger.exception("Caught unhandled exception, marking pipeline as failed")
+        except UnsupportedRequirement:
+            raise
+        except:
+            if sys.exc_info()[0] is KeyboardInterrupt:
+                logger.error("Interrupted, marking pipeline as failed")
+            else:
+                logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+            if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
-            finally:
-                self.cond.release()
+            if runnerjob and runnerjob.uuid and self.work_api == "containers":
+                self.api.container_requests().update(uuid=runnerjob.uuid,
+                                                     body={"priority": "0"}).execute(num_retries=self.num_retries)
+        finally:
+            self.cond.release()
+            self.stop_polling.set()
+            self.polling_thread.join()
 
-            return self.final_output
+        if self.final_status == "UnsupportedRequirement":
+            raise UnsupportedRequirement("Check log for details.")
 
+        if self.final_status != "success":
+            raise WorkflowException("Workflow failed.")
+
+        if self.final_output is None:
+            raise WorkflowException("Workflow did not return a result.")
+
+        if kwargs.get("compute_checksum"):
+            adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
+
+        return self.final_output
+
+
+def versionstring():
+    """Print version string of key packages for provenance and debugging."""
+
+    arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
+    arvpkg = pkg_resources.require("arvados-python-client")
+    cwlpkg = pkg_resources.require("cwltool")
+
+    return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
+                                    "arvados-python-client", arvpkg[0].version,
+                                    "cwltool", cwlpkg[0].version)
+
+
+def arg_parser():  # type: () -> argparse.ArgumentParser
+    parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
+
+    parser.add_argument("--basedir", type=str,
+                        help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
+    parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
+                        help="Output directory, default current directory")
+
+    parser.add_argument("--eval-timeout",
+                        help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
+                        type=float,
+                        default=20)
+    parser.add_argument("--version", action="store_true", help="Print version and exit")
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--verbose", action="store_true", help="Default logging")
+    exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
+    exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
+
+    parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
 
-def main(args, stdout, stderr, api_client=None):
-    args.insert(0, "--leave-outputs")
-    parser = cwltool.main.arg_parser()
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--enable-reuse", action="store_true",
                         default=True, dest="enable_reuse",
@@ -445,12 +333,65 @@ def main(args, stdout, stderr, api_client=None):
     exgroup.add_argument("--disable-reuse", action="store_false",
                         default=True, dest="enable_reuse",
                         help="")
-    parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
+
+    parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
+    parser.add_argument("--ignore-docker-for-reuse", action="store_true",
+                        help="Ignore Docker image version when deciding whether to reuse past jobs.",
+                        default=False)
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
+                        default=True, dest="submit")
+    exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
+                        default=True, dest="submit")
+    exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
+    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
+    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
+                        default=True, dest="wait")
+    exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
+                        default=True, dest="wait")
+
+    parser.add_argument("--api", type=str,
+                        default=None, dest="work_api",
+                        help="Select work submission API, one of 'jobs' or 'containers'.")
+
+    parser.add_argument("--compute-checksum", action="store_true", default=False,
+                        help="Compute checksum of contents while collecting outputs",
+                        dest="compute_checksum")
+
+    parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
+    parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
+
+    return parser
+
+
+def main(args, stdout, stderr, api_client=None):
+    parser = arg_parser()
+
+    job_order_object = None
+    arvargs = parser.parse_args(args)
+    if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
+        job_order_object = ({}, "")
 
     try:
-        runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
+        if api_client is None:
+            api_client=arvados.api('v1', model=OrderedJsonModel())
+        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
     except Exception as e:
         logger.error(e)
         return 1
 
-    return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)
+    arvargs.conformance_test = None
+    arvargs.use_container = True
+
+    return cwltool.main.main(args=arvargs,
+                             stdout=stdout,
+                             stderr=stderr,
+                             executor=runner.arvExecutor,
+                             makeTool=runner.arvMakeTool,
+                             versionfunc=versionstring,
+                             job_order_object=job_order_object,
+                             make_fs_access=partial(CollectionFsAccess, api_client=api_client))