100032: Add metrics to job submission in --debug mode.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index cf18894eaa61394a14165f3056b5e00063899209..7bfdba8b80e48d9cb24d7054933b428aa3729764 100644 (file)
 #!/usr/bin/env python
 
+# Implement cwl-runner interface for submitting and running work on Arvados, using
+# either the Crunch jobs API or Crunch containers API.
+
 import argparse
-import arvados
-import arvados.events
-import arvados.commands.keepdocker
-import arvados.commands.run
-import cwltool.draft2tool
-import cwltool.workflow
-import cwltool.main
-import threading
-import cwltool.docker
-import fnmatch
 import logging
-import re
 import os
+import sys
+import threading
+import hashlib
+from functools import partial
+import pkg_resources  # part of setuptools
 
-from cwltool.process import get_feature
-
-logger = logging.getLogger('arvados.cwl-runner')
-logger.setLevel(logging.INFO)
-
-def arv_docker_get_image(api_client, dockerRequirement, pull_image):
-    if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
-        dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
-
-    sp = dockerRequirement["dockerImageId"].split(":")
-    image_name = sp[0]
-    image_tag = sp[1] if len(sp) > 1 else None
-
-    images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
-                                                            image_name=image_name,
-                                                            image_tag=image_tag)
-
-    if not images:
-        imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
-        args = [image_name]
-        if image_tag:
-            args.append(image_tag)
-        arvados.commands.keepdocker.main(args)
-
-    return dockerRequirement["dockerImageId"]
-
-
-class CollectionFsAccess(cwltool.process.StdFsAccess):
-    def __init__(self, basedir):
-        self.collections = {}
-        self.basedir = basedir
-
-    def get_collection(self, path):
-        p = path.split("/")
-        if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
-            pdh = p[0][5:]
-            if pdh not in self.collections:
-                self.collections[pdh] = arvados.collection.CollectionReader(pdh)
-            return (self.collections[pdh], "/".join(p[1:]))
-        else:
-            return (None, path)
-
-    def _match(self, collection, patternsegments, parent):
-        if not patternsegments:
-            return []
-
-        if not isinstance(collection, arvados.collection.RichCollectionBase):
-            return []
-
-        ret = []
-        # iterate over the files and subcollections in 'collection'
-        for filename in collection:
-            if patternsegments[0] == '.':
-                # Pattern contains something like "./foo" so just shift
-                # past the "./"
-                ret.extend(self._match(collection, patternsegments[1:], parent))
-            elif fnmatch.fnmatch(filename, patternsegments[0]):
-                cur = os.path.join(parent, filename)
-                if len(patternsegments) == 1:
-                    ret.append(cur)
-                else:
-                    ret.extend(self._match(collection[filename], patternsegments[1:], cur))
-        return ret
-
-    def glob(self, pattern):
-        collection, rest = self.get_collection(pattern)
-        patternsegments = rest.split("/")
-        return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
-
-    def open(self, fn, mode):
-        collection, rest = self.get_collection(fn)
-        if collection:
-            return collection.open(rest, mode)
-        else:
-            return open(self._abs(fn), mode)
-
-    def exists(self, fn):
-        collection, rest = self.get_collection(fn)
-        if collection:
-            return collection.exists(rest)
-        else:
-            return os.path.exists(self._abs(fn))
-
-class ArvadosJob(object):
-    def __init__(self, runner):
-        self.arvrunner = runner
-        self.running = False
-
-    def run(self, dry_run=False, pull_image=True, **kwargs):
-        script_parameters = {
-            "command": self.command_line
-        }
-        runtime_constraints = {}
-
-        if self.generatefiles:
-            vwd = arvados.collection.Collection()
-            script_parameters["task.vwd"] = {}
-            for t in self.generatefiles:
-                if isinstance(self.generatefiles[t], dict):
-                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
-                    vwd.copy(rest, t, source_collection=src)
-                else:
-                    with vwd.open(t, "w") as f:
-                        f.write(self.generatefiles[t])
-            vwd.save_new()
-            for t in self.generatefiles:
-                script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
-
-        script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
-        if self.environment:
-            script_parameters["task.env"].update(self.environment)
-
-        if self.stdin:
-            script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
-
-        if self.stdout:
-            script_parameters["task.stdout"] = self.stdout
-
-        (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
-        if docker_req and kwargs.get("use_container") is not False:
-            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
-
-        try:
-            response = self.arvrunner.api.jobs().create(body={
-                "script": "crunchrunner",
-                "repository": kwargs["repository"],
-                "script_version": "master",
-                "script_parameters": {"tasks": [script_parameters]},
-                "runtime_constraints": runtime_constraints
-            }, find_or_create=kwargs.get("enable_reuse", True)).execute()
-
-            self.arvrunner.jobs[response["uuid"]] = self
-
-            logger.info("Job %s is %s", response["uuid"], response["state"])
-
-            if response["state"] in ("Complete", "Failed", "Cancelled"):
-                self.done(response)
-        except Exception as e:
-            logger.error("Got error %s" % str(e))
-            self.output_callback({}, "permanentFail")
-
-
-    def done(self, record):
-        try:
-            if record["state"] == "Complete":
-                processStatus = "success"
-            else:
-                processStatus = "permanentFail"
-
-            try:
-                outputs = {}
-                if record["output"]:
-                    outputs = self.collect_outputs("keep:" + record["output"])
-            except Exception as e:
-                logger.exception("Got exception while collecting job outputs:")
-                processStatus = "permanentFail"
-
-            self.output_callback(outputs, processStatus)
-        finally:
-            del self.arvrunner.jobs[record["uuid"]]
-
-
-class ArvPathMapper(cwltool.pathmapper.PathMapper):
-    def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
-        self._pathmap = arvrunner.get_uploaded()
-        uploadfiles = []
-
-        pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
-
-        for src in referenced_files:
-            if isinstance(src, basestring) and pdh_path.match(src):
-                self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
-            if src not in self._pathmap:
-                ab = cwltool.pathmapper.abspath(src, basedir)
-                st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
-                if kwargs.get("conformance_test"):
-                    self._pathmap[src] = (src, ab)
-                elif isinstance(st, arvados.commands.run.UploadFile):
-                    uploadfiles.append((src, ab, st))
-                elif isinstance(st, arvados.commands.run.ArvFile):
-                    self._pathmap[src] = (ab, st.fn)
-                else:
-                    raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
-
-        if uploadfiles:
-            arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
-                                             arvrunner.api,
-                                             dry_run=kwargs.get("dry_run"),
-                                             num_retries=3,
-                                             fnPattern="$(task.keep)/%s/%s")
-
-        for src, ab, st in uploadfiles:
-            arvrunner.add_uploaded(src, (ab, st.fn))
-            self._pathmap[src] = (ab, st.fn)
+from cwltool.errors import WorkflowException
+import cwltool.main
+import cwltool.workflow
 
+import arvados
+import arvados.config
 
+from .arvcontainer import ArvadosContainer, RunnerContainer
+from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
+from .arvtool import ArvadosCommandTool
+from .fsaccess import CollectionFsAccess
+from .arvworkflow import make_workflow
+from .perf import Perf
 
-class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
-    def __init__(self, arvrunner, toolpath_object, **kwargs):
-        super(ArvadosCommandTool, self).__init__(toolpath_object, outdir="$(task.outdir)", tmpdir="$(task.tmpdir)", **kwargs)
-        self.arvrunner = arvrunner
+from cwltool.process import shortname, UnsupportedRequirement
+from cwltool.pathmapper import adjustFileObjs
+from cwltool.draft2tool import compute_checksums
+from arvados.api import OrderedJsonModel
 
-    def makeJobRunner(self):
-        return ArvadosJob(self.arvrunner)
+logger = logging.getLogger('arvados.cwl-runner')
+logger.setLevel(logging.INFO)
 
-    def makePathMapper(self, reffiles, input_basedir, **kwargs):
-        return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
+class ArvCwlRunner(object):
+    """Execute a CWL tool or workflow, submit work (using either jobs or
+    containers API), wait for them to complete, and report output.
 
+    """
 
-class ArvCwlRunner(object):
-    def __init__(self, api_client):
+    def __init__(self, api_client, work_api=None):
         self.api = api_client
-        self.jobs = {}
+        self.processes = {}
         self.lock = threading.Lock()
         self.cond = threading.Condition(self.lock)
         self.final_output = None
+        self.final_status = None
         self.uploaded = {}
+        self.num_retries = 4
+        self.uuid = None
+        self.work_api = work_api
+        self.stop_polling = threading.Event()
+        self.poll_api = None
+
+        if self.work_api is None:
+            # todo: autodetect API to use.
+            self.work_api = "jobs"
+
+        if self.work_api not in ("containers", "jobs"):
+            raise Exception("Unsupported API '%s'" % self.work_api)
 
     def arvMakeTool(self, toolpath_object, **kwargs):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
+            kwargs["work_api"] = self.work_api
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
         else:
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
     def output_callback(self, out, processStatus):
         if processStatus == "success":
-            logger.info("Overall job status is %s", processStatus)
+            logger.info("Overall process status is %s", processStatus)
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Complete"}).execute(num_retries=self.num_retries)
         else:
-            logger.warn("Overall job status is %s", processStatus)
+            logger.warn("Overall process status is %s", processStatus)
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
+        self.final_status = processStatus
         self.final_output = out
 
     def on_message(self, event):
         if "object_uuid" in event:
-                if event["object_uuid"] in self.jobs and event["event_type"] == "update":
-                    if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
-                        logger.info("Job %s is Running", event["object_uuid"])
-                        with self.lock:
-                            self.jobs[event["object_uuid"]].running = True
-                    elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
-                        logger.info("Job %s is %s", event["object_uuid"], event["properties"]["new_attributes"]["state"])
-                        try:
-                            self.cond.acquire()
-                            self.jobs[event["object_uuid"]].done(event["properties"]["new_attributes"])
-                            self.cond.notify()
-                        finally:
-                            self.cond.release()
+            if event["object_uuid"] in self.processes and event["event_type"] == "update":
+                if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
+                    uuid = event["object_uuid"]
+                    with self.lock:
+                        j = self.processes[uuid]
+                        logger.info("Job %s (%s) is Running", j.name, uuid)
+                        j.running = True
+                        j.update_pipeline_component(event["properties"]["new_attributes"])
+                elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+                    uuid = event["object_uuid"]
+                    try:
+                        self.cond.acquire()
+                        j = self.processes[uuid]
+                        logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
+                        with Perf(logger, "done %s" % j.name):
+                            j.done(event["properties"]["new_attributes"])
+                        self.cond.notify()
+                    finally:
+                        self.cond.release()
+
+    def poll_states(self):
+        """Poll status of jobs or containers listed in the processes dict.
+
+        Runs in a separate thread.
+        """
+
+        while True:
+            self.stop_polling.wait(15)
+            if self.stop_polling.is_set():
+                break
+            with self.lock:
+                keys = self.processes.keys()
+            if not keys:
+                continue
+
+            if self.work_api == "containers":
+                table = self.poll_api.containers()
+            elif self.work_api == "jobs":
+                table = self.poll_api.jobs()
+
+            try:
+                proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+            except Exception as e:
+                logger.warn("Error checking states on API server: %s", e)
+                continue
+
+            for p in proc_states["items"]:
+                self.on_message({
+                    "object_uuid": p["uuid"],
+                    "event_type": "update",
+                    "properties": {
+                        "new_attributes": p
+                    }
+                })
 
     def get_uploaded(self):
         return self.uploaded.copy()
@@ -268,65 +145,253 @@ class ArvCwlRunner(object):
     def add_uploaded(self, src, pair):
         self.uploaded[src] = pair
 
-    def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
-        events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
-
-        self.fs_access = CollectionFsAccess(input_basedir)
-
-        kwargs["fs_access"] = self.fs_access
-        kwargs["enable_reuse"] = args.enable_reuse
-        kwargs["repository"] = args.repository
-
-        if kwargs.get("conformance_test"):
-            return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
+    def check_writable(self, obj):
+        if isinstance(obj, dict):
+            if obj.get("writable"):
+                raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
+            for v in obj.itervalues():
+                self.check_writable(v)
+        if isinstance(obj, list):
+            for v in obj:
+                self.check_writable(v)
+
+    def arvExecutor(self, tool, job_order, **kwargs):
+        self.debug = kwargs.get("debug")
+
+        tool.visit(self.check_writable)
+
+        if kwargs.get("quiet"):
+            logger.setLevel(logging.WARN)
+            logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
+
+        if self.debug:
+            logger.setLevel(logging.DEBUG)
+
+        useruuid = self.api.users().current().execute()["uuid"]
+        self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
+        self.pipeline = None
+        make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
+        self.fs_access = make_fs_access(kwargs["basedir"])
+
+        if kwargs.get("create_template"):
+            tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
+            tmpl.save()
+            # cwltool.main will write our return value to stdout.
+            return tmpl.uuid
+
+        if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
+            return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+
+        self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
+
+        kwargs["make_fs_access"] = make_fs_access
+        kwargs["enable_reuse"] = kwargs.get("enable_reuse")
+        kwargs["use_container"] = True
+        kwargs["tmpdir_prefix"] = "tmp"
+        kwargs["on_error"] = "continue"
+        kwargs["compute_checksum"] = kwargs.get("compute_checksum")
+
+        if self.work_api == "containers":
+            kwargs["outdir"] = "/var/spool/cwl"
+            kwargs["docker_outdir"] = "/var/spool/cwl"
+            kwargs["tmpdir"] = "/tmp"
+            kwargs["docker_tmpdir"] = "/tmp"
+        elif self.work_api == "jobs":
+            kwargs["outdir"] = "$(task.outdir)"
+            kwargs["docker_outdir"] = "$(task.outdir)"
+            kwargs["tmpdir"] = "$(task.tmpdir)"
+
+        runnerjob = None
+        if kwargs.get("submit"):
+            if self.work_api == "containers":
+                if tool.tool["class"] == "CommandLineTool":
+                    runnerjob = tool.job(job_order,
+                                         self.output_callback,
+                                         **kwargs).next()
+                else:
+                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
+            else:
+                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
+
+        if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
+            # Create pipeline for local run
+            self.pipeline = self.api.pipeline_instances().create(
+                body={
+                    "owner_uuid": self.project_uuid,
+                    "name": shortname(tool.tool["id"]),
+                    "components": {},
+                    "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+            logger.info("Pipeline instance %s", self.pipeline["uuid"])
+
+        if runnerjob and not kwargs.get("wait"):
+            runnerjob.run()
+            return runnerjob.uuid
+
+        self.poll_api = arvados.api('v1')
+        self.polling_thread = threading.Thread(target=self.poll_states)
+        self.polling_thread.start()
+
+        if runnerjob:
+            jobiter = iter((runnerjob,))
         else:
+            if "cwl_runner_job" in kwargs:
+                self.uuid = kwargs.get("cwl_runner_job").get('uuid')
             jobiter = tool.job(job_order,
-                            input_basedir,
-                            self.output_callback,
-                            **kwargs)
+                               self.output_callback,
+                               **kwargs)
+
+        try:
+            self.cond.acquire()
+            # Will continue to hold the lock for the duration of this code
+            # except when in cond.wait(), at which point on_message can update
+            # job state and process output callbacks.
 
             for runnable in jobiter:
                 if runnable:
-                    with self.lock:
+                    with Perf(logger, "run"):
                         runnable.run(**kwargs)
                 else:
-                    if self.jobs:
-                        try:
-                            self.cond.acquire()
-                            self.cond.wait()
-                        finally:
-                            self.cond.release()
+                    if self.processes:
+                        self.cond.wait(1)
                     else:
-                        logger.error("Workflow cannot make any more progress.")
+                        logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                         break
 
-            while self.jobs:
-                try:
-                    self.cond.acquire()
-                    self.cond.wait()
-                finally:
-                    self.cond.release()
+            while self.processes:
+                self.cond.wait(1)
 
-            events.close()
+        except UnsupportedRequirement:
+            raise
+        except:
+            if sys.exc_info()[0] is KeyboardInterrupt:
+                logger.error("Interrupted, marking pipeline as failed")
+            else:
+                logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            if runnerjob and runnerjob.uuid and self.work_api == "containers":
+                self.api.container_requests().update(uuid=runnerjob.uuid,
+                                                     body={"priority": "0"}).execute(num_retries=self.num_retries)
+        finally:
+            self.cond.release()
+            self.stop_polling.set()
+            self.polling_thread.join()
 
-            if self.final_output is None:
-                raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+        if self.final_status == "UnsupportedRequirement":
+            raise UnsupportedRequirement("Check log for details.")
 
-            return self.final_output
+        if self.final_status != "success":
+            raise WorkflowException("Workflow failed.")
 
+        if self.final_output is None:
+            raise WorkflowException("Workflow did not return a result.")
+
+        if kwargs.get("compute_checksum"):
+            adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
+
+        return self.final_output
+
+
+def versionstring():
+    """Print version string of key packages for provenance and debugging."""
+
+    arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
+    arvpkg = pkg_resources.require("arvados-python-client")
+    cwlpkg = pkg_resources.require("cwltool")
+
+    return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
+                                    "arvados-python-client", arvpkg[0].version,
+                                    "cwltool", cwlpkg[0].version)
+
+
+def arg_parser():  # type: () -> argparse.ArgumentParser
+    parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
+
+    parser.add_argument("--basedir", type=str,
+                        help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
+    parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
+                        help="Output directory, default current directory")
+
+    parser.add_argument("--eval-timeout",
+                        help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
+                        type=float,
+                        default=20)
+    parser.add_argument("--version", action="store_true", help="Print version and exit")
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--verbose", action="store_true", help="Default logging")
+    exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
+    exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
+
+    parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
 
-def main(args, stdout, stderr, api_client=None):
-    runner = ArvCwlRunner(api_client=arvados.api('v1'))
-    args.insert(0, "--leave-outputs")
-    parser = cwltool.main.arg_parser()
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--enable-reuse", action="store_true",
-                        default=False, dest="enable_reuse",
+                        default=True, dest="enable_reuse",
                         help="")
     exgroup.add_argument("--disable-reuse", action="store_false",
-                        default=False, dest="enable_reuse",
+                        default=True, dest="enable_reuse",
                         help="")
 
-    parser.add_argument('--repository', type=str, default="peter/crunchrunner", help="Repository containing the 'crunchrunner' program.")
+    parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
+    parser.add_argument("--ignore-docker-for-reuse", action="store_true",
+                        help="Ignore Docker image version when deciding whether to reuse past jobs.",
+                        default=False)
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
+                        default=True, dest="submit")
+    exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
+                        default=True, dest="submit")
+    exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
+    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
+    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
+                        default=True, dest="wait")
+    exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
+                        default=True, dest="wait")
+
+    parser.add_argument("--api", type=str,
+                        default=None, dest="work_api",
+                        help="Select work submission API, one of 'jobs' or 'containers'.")
 
-    return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)
+    parser.add_argument("--compute-checksum", action="store_true", default=False,
+                        help="Compute checksum of contents while collecting outputs",
+                        dest="compute_checksum")
+
+    parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
+    parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
+
+    return parser
+
+
+def main(args, stdout, stderr, api_client=None):
+    parser = arg_parser()
+
+    job_order_object = None
+    arvargs = parser.parse_args(args)
+    if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
+        job_order_object = ({}, "")
+
+    try:
+        if api_client is None:
+            api_client=arvados.api('v1', model=OrderedJsonModel())
+        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
+    except Exception as e:
+        logger.error(e)
+        return 1
+
+    arvargs.conformance_test = None
+    arvargs.use_container = True
+
+    return cwltool.main.main(args=arvargs,
+                             stdout=stdout,
+                             stderr=stderr,
+                             executor=runner.arvExecutor,
+                             makeTool=runner.arvMakeTool,
+                             versionfunc=versionstring,
+                             job_order_object=job_order_object,
+                             make_fs_access=partial(CollectionFsAccess, api_client=api_client))