Merge branch '10583-cwl-salad-ver' refs #10583
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 80734d2f7c8e41e45ce25b7bd38cba6094b071e3..9cabedf7f77a15532ad1379d638d5ca9a4169e44 100644 (file)
@@ -9,37 +9,47 @@ import os
 import sys
 import threading
 import hashlib
 import sys
 import threading
 import hashlib
+import copy
+import json
 from functools import partial
 import pkg_resources  # part of setuptools
 
 from cwltool.errors import WorkflowException
 import cwltool.main
 import cwltool.workflow
 from functools import partial
 import pkg_resources  # part of setuptools
 
 from cwltool.errors import WorkflowException
 import cwltool.main
 import cwltool.workflow
+import schema_salad
 
 import arvados
 import arvados.config
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 
 import arvados
 import arvados.config
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
+from. runner import Runner, upload_instance
 from .arvtool import ArvadosCommandTool
 from .arvtool import ArvadosCommandTool
+from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess
 from .fsaccess import CollectionFsAccess
-from .arvworkflow import make_workflow
+from .perf import Perf
+from .pathmapper import FinalOutputPathMapper
+from ._version import __version__
 
 
-from cwltool.process import shortname, UnsupportedRequirement
-from cwltool.pathmapper import adjustFileObjs
+from cwltool.pack import pack
+from cwltool.process import shortname, UnsupportedRequirement, getListing
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
 from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
 logger.setLevel(logging.INFO)
 
 logger.setLevel(logging.INFO)
 
+
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
     containers API), wait for them to complete, and report output.
 
     """
 
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
     containers API), wait for them to complete, and report output.
 
     """
 
-    def __init__(self, api_client, work_api=None):
+    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
         self.api = api_client
         self.processes = {}
         self.lock = threading.Lock()
         self.api = api_client
         self.processes = {}
         self.lock = threading.Lock()
@@ -49,20 +59,40 @@ class ArvCwlRunner(object):
         self.uploaded = {}
         self.num_retries = 4
         self.uuid = None
         self.uploaded = {}
         self.num_retries = 4
         self.uuid = None
-        self.work_api = work_api
         self.stop_polling = threading.Event()
         self.poll_api = None
         self.stop_polling = threading.Event()
         self.poll_api = None
+        self.pipeline = None
+        self.final_output_collection = None
+        self.output_name = output_name
+        self.output_tags = output_tags
+        self.project_uuid = None
 
 
-        if self.work_api is None:
-            # todo: autodetect API to use.
-            self.work_api = "jobs"
+        if keep_client is not None:
+            self.keep_client = keep_client
+        else:
+            self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
 
-        if self.work_api not in ("containers", "jobs"):
-            raise Exception("Unsupported API '%s'" % self.work_api)
+        for api in ["jobs", "containers"]:
+            try:
+                methods = self.api._rootDesc.get('resources')[api]['methods']
+                if ('httpMethod' in methods['create'] and
+                    (work_api == api or work_api is None)):
+                    self.work_api = api
+                    break
+            except KeyError:
+                pass
+        if not self.work_api:
+            if work_api is None:
+                raise Exception("No supported APIs")
+            else:
+                raise Exception("Unsupported API '%s'" % work_api)
 
 
-    def arvMakeTool(self, toolpath_object, **kwargs):
+    def arv_make_tool(self, toolpath_object, **kwargs):
+        kwargs["work_api"] = self.work_api
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
-            return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
+            return ArvadosCommandTool(self, toolpath_object, **kwargs)
+        elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
+            return ArvadosWorkflow(self, toolpath_object, **kwargs)
         else:
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
         else:
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
@@ -95,8 +125,10 @@ class ArvCwlRunner(object):
                     try:
                         self.cond.acquire()
                         j = self.processes[uuid]
                     try:
                         self.cond.acquire()
                         j = self.processes[uuid]
-                        logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
-                        j.done(event["properties"]["new_attributes"])
+                        txt = self.work_api[0].upper() + self.work_api[1:-1]
+                        logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
+                        with Perf(metrics, "done %s" % j.name):
+                            j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
                     finally:
                         self.cond.release()
                         self.cond.notify()
                     finally:
                         self.cond.release()
@@ -107,34 +139,43 @@ class ArvCwlRunner(object):
         Runs in a separate thread.
         """
 
         Runs in a separate thread.
         """
 
-        while True:
-            self.stop_polling.wait(15)
-            if self.stop_polling.is_set():
-                break
-            with self.lock:
-                keys = self.processes.keys()
-            if not keys:
-                continue
-
-            if self.work_api == "containers":
-                table = self.poll_api.containers()
-            elif self.work_api == "jobs":
-                table = self.poll_api.jobs()
-
-            try:
-                proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
-            except Exception as e:
-                logger.warn("Error checking states on API server: %s", e)
-                continue
-
-            for p in proc_states["items"]:
-                self.on_message({
-                    "object_uuid": p["uuid"],
-                    "event_type": "update",
-                    "properties": {
-                        "new_attributes": p
-                    }
-                })
+        try:
+            while True:
+                self.stop_polling.wait(15)
+                if self.stop_polling.is_set():
+                    break
+                with self.lock:
+                    keys = self.processes.keys()
+                if not keys:
+                    continue
+
+                if self.work_api == "containers":
+                    table = self.poll_api.containers()
+                elif self.work_api == "jobs":
+                    table = self.poll_api.jobs()
+
+                try:
+                    proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+                except Exception as e:
+                    logger.warn("Error checking states on API server: %s", e)
+                    continue
+
+                for p in proc_states["items"]:
+                    self.on_message({
+                        "object_uuid": p["uuid"],
+                        "event_type": "update",
+                        "properties": {
+                            "new_attributes": p
+                        }
+                    })
+        except:
+            logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
+            self.cond.acquire()
+            self.processes.clear()
+            self.cond.notify()
+            self.cond.release()
+        finally:
+            self.stop_polling.set()
 
     def get_uploaded(self):
         return self.uploaded.copy()
 
     def get_uploaded(self):
         return self.uploaded.copy()
@@ -152,31 +193,130 @@ class ArvCwlRunner(object):
             for v in obj:
                 self.check_writable(v)
 
             for v in obj:
                 self.check_writable(v)
 
-    def arvExecutor(self, tool, job_order, **kwargs):
+    def make_output_collection(self, name, tagsString, outputObj):
+        outputObj = copy.deepcopy(outputObj)
+
+        files = []
+        def capture(fileobj):
+            files.append(fileobj)
+
+        adjustDirObjs(outputObj, capture)
+        adjustFileObjs(outputObj, capture)
+
+        generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
+
+        final = arvados.collection.Collection(api_client=self.api,
+                                              keep_client=self.keep_client,
+                                              num_retries=self.num_retries)
+
+        srccollections = {}
+        for k,v in generatemapper.items():
+            if k.startswith("_:"):
+                if v.type == "Directory":
+                    continue
+                if v.type == "CreateFile":
+                    with final.open(v.target, "wb") as f:
+                        f.write(v.resolved.encode("utf-8"))
+                    continue
+
+            if not k.startswith("keep:"):
+                raise Exception("Output source is not in keep or a literal")
+            sp = k.split("/")
+            srccollection = sp[0][5:]
+            if srccollection not in srccollections:
+                try:
+                    srccollections[srccollection] = arvados.collection.CollectionReader(
+                        srccollection,
+                        api_client=self.api,
+                        keep_client=self.keep_client,
+                        num_retries=self.num_retries)
+                except arvados.errors.ArgumentError as e:
+                    logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+                    raise
+            reader = srccollections[srccollection]
+            try:
+                srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
+                final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
+            except IOError as e:
+                logger.warn("While preparing output collection: %s", e)
+
+        def rewrite(fileobj):
+            fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
+            for k in ("basename", "listing", "contents"):
+                if k in fileobj:
+                    del fileobj[k]
+
+        adjustDirObjs(outputObj, rewrite)
+        adjustFileObjs(outputObj, rewrite)
+
+        with final.open("cwl.output.json", "w") as f:
+            json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
+
+        final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
+
+        logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
+                    final.api_response()["name"],
+                    final.manifest_locator())
+
+        final_uuid = final.manifest_locator()
+        tags = tagsString.split(',')
+        for tag in tags:
+             self.api.links().create(body={
+                "head_uuid": final_uuid, "link_class": "tag", "name": tag
+                }).execute(num_retries=self.num_retries)
+
+        def finalcollection(fileobj):
+            fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
+
+        adjustDirObjs(outputObj, finalcollection)
+        adjustFileObjs(outputObj, finalcollection)
+
+        return (outputObj, final)
+
+    def set_crunch_output(self):
+        if self.work_api == "containers":
+            try:
+                current = self.api.containers().current().execute(num_retries=self.num_retries)
+                self.api.containers().update(uuid=current['uuid'],
+                                             body={
+                                                 'output': self.final_output_collection.portable_data_hash(),
+                                             }).execute(num_retries=self.num_retries)
+            except Exception as e:
+                logger.info("Setting container output: %s", e)
+        elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
+            self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
+                                   body={
+                                       'output': self.final_output_collection.portable_data_hash(),
+                                       'success': self.final_status == "success",
+                                       'progress':1.0
+                                   }).execute(num_retries=self.num_retries)
+
+    def arv_executor(self, tool, job_order, **kwargs):
         self.debug = kwargs.get("debug")
 
         tool.visit(self.check_writable)
 
         self.debug = kwargs.get("debug")
 
         tool.visit(self.check_writable)
 
-        if kwargs.get("quiet"):
-            logger.setLevel(logging.WARN)
-            logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
-
-        useruuid = self.api.users().current().execute()["uuid"]
-        self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
+        self.project_uuid = kwargs.get("project_uuid")
         self.pipeline = None
         self.pipeline = None
-        make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
+        make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
+                                                                 api_client=self.api,
+                                                                 keep_client=self.keep_client)
         self.fs_access = make_fs_access(kwargs["basedir"])
 
         self.fs_access = make_fs_access(kwargs["basedir"])
 
-        if kwargs.get("create_template"):
-            tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
-            tmpl.save()
-            # cwltool.main will write our return value to stdout.
-            return tmpl.uuid
-
-        if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
-            return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+        existing_uuid = kwargs.get("update_workflow")
+        if existing_uuid or kwargs.get("create_workflow"):
+            if self.work_api == "jobs":
+                tmpl = RunnerTemplate(self, tool, job_order,
+                                      kwargs.get("enable_reuse"),
+                                      uuid=existing_uuid)
+                tmpl.save()
+                # cwltool.main will write our return value to stdout.
+                return tmpl.uuid
+            else:
+                return upload_workflow(self, tool, job_order,
+                                       self.project_uuid,
+                                       uuid=existing_uuid)
 
 
-        self.debug = kwargs.get("debug")
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
         kwargs["make_fs_access"] = make_fs_access
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
         kwargs["make_fs_access"] = make_fs_access
@@ -196,6 +336,8 @@ class ArvCwlRunner(object):
             kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
             kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
+        upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
+
         runnerjob = None
         if kwargs.get("submit"):
             if self.work_api == "containers":
         runnerjob = None
         if kwargs.get("submit"):
             if self.work_api == "containers":
@@ -204,9 +346,9 @@ class ArvCwlRunner(object):
                                          self.output_callback,
                                          **kwargs).next()
                 else:
                                          self.output_callback,
                                          **kwargs).next()
                 else:
-                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
+                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
             else:
             else:
-                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
+                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
 
         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
             # Create pipeline for local run
 
         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
             # Create pipeline for local run
@@ -219,7 +361,7 @@ class ArvCwlRunner(object):
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
         if runnerjob and not kwargs.get("wait"):
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
         if runnerjob and not kwargs.get("wait"):
-            runnerjob.run()
+            runnerjob.run(wait=kwargs.get("wait"))
             return runnerjob.uuid
 
         self.poll_api = arvados.api('v1')
             return runnerjob.uuid
 
         self.poll_api = arvados.api('v1')
@@ -241,15 +383,25 @@ class ArvCwlRunner(object):
             # except when in cond.wait(), at which point on_message can update
             # job state and process output callbacks.
 
             # except when in cond.wait(), at which point on_message can update
             # job state and process output callbacks.
 
+            loopperf = Perf(metrics, "jobiter")
+            loopperf.__enter__()
             for runnable in jobiter:
             for runnable in jobiter:
+                loopperf.__exit__()
+
+                if self.stop_polling.is_set():
+                    break
+
                 if runnable:
                 if runnable:
-                    runnable.run(**kwargs)
+                    with Perf(metrics, "run"):
+                        runnable.run(**kwargs)
                 else:
                     if self.processes:
                         self.cond.wait(1)
                     else:
                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                         break
                 else:
                     if self.processes:
                         self.cond.wait(1)
                     else:
                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                         break
+                loopperf.__enter__()
+            loopperf.__exit__()
 
             while self.processes:
                 self.cond.wait(1)
 
             while self.processes:
                 self.cond.wait(1)
@@ -260,7 +412,7 @@ class ArvCwlRunner(object):
             if sys.exc_info()[0] is KeyboardInterrupt:
                 logger.error("Interrupted, marking pipeline as failed")
             else:
             if sys.exc_info()[0] is KeyboardInterrupt:
                 logger.error("Interrupted, marking pipeline as failed")
             else:
-                logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+                logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
@@ -275,13 +427,24 @@ class ArvCwlRunner(object):
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
 
-        if self.final_status != "success":
-            raise WorkflowException("Workflow failed.")
-
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
+        if kwargs.get("submit") and isinstance(runnerjob, Runner):
+            logger.info("Final output collection %s", runnerjob.final_output)
+        else:
+            if self.output_name is None:
+                self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
+            if self.output_tags is None:
+                self.output_tags = ""
+            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
+            self.set_crunch_output()
+
+        if self.final_status != "success":
+            raise WorkflowException("Workflow failed.")
+
         if kwargs.get("compute_checksum"):
         if kwargs.get("compute_checksum"):
+            adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
         return self.final_output
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
         return self.final_output
@@ -294,7 +457,7 @@ def versionstring():
     arvpkg = pkg_resources.require("arvados-python-client")
     cwlpkg = pkg_resources.require("cwltool")
 
     arvpkg = pkg_resources.require("arvados-python-client")
     cwlpkg = pkg_resources.require("cwltool")
 
-    return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
+    return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
                                     "arvados-python-client", arvpkg[0].version,
                                     "cwltool", cwlpkg[0].version)
 
                                     "arvados-python-client", arvpkg[0].version,
                                     "cwltool", cwlpkg[0].version)
 
@@ -318,6 +481,8 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
 
     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
 
+    parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
+
     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
 
     exgroup = parser.add_mutually_exclusive_group()
     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
 
     exgroup = parser.add_mutually_exclusive_group()
@@ -328,7 +493,9 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=True, dest="enable_reuse",
                         help="")
 
                         default=True, dest="enable_reuse",
                         help="")
 
-    parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
+    parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
+    parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
+    parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
                         default=False)
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
                         default=False)
@@ -338,9 +505,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=True, dest="submit")
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
                         default=True, dest="submit")
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
-    exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
-    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
-    exgroup.add_argument("--update-workflow", type=str, help="Update Arvados workflow.")
+    exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
+                         dest="create_workflow")
+    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
+    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
@@ -350,7 +518,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 
     parser.add_argument("--api", type=str,
                         default=None, dest="work_api",
 
     parser.add_argument("--api", type=str,
                         default=None, dest="work_api",
-                        help="Select work submission API, one of 'jobs' or 'containers'.")
+                        help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
 
     parser.add_argument("--compute-checksum", action="store_true", default=False,
                         help="Compute checksum of contents while collecting outputs",
 
     parser.add_argument("--compute-checksum", action="store_true", default=False,
                         help="Compute checksum of contents while collecting outputs",
@@ -361,31 +529,70 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 
     return parser
 
 
     return parser
 
-
-def main(args, stdout, stderr, api_client=None):
+def add_arv_hints():
+    cache = {}
+    res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
+    cache["http://arvados.org/cwl"] = res.read()
+    res.close()
+    document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
+    _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
+    for n in extnames.names:
+        if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
+            cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
+        document_loader.idx["http://arvados.org/cwl#"+n] = {}
+
+def main(args, stdout, stderr, api_client=None, keep_client=None):
     parser = arg_parser()
 
     job_order_object = None
     arvargs = parser.parse_args(args)
     parser = arg_parser()
 
     job_order_object = None
     arvargs = parser.parse_args(args)
-    if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
+
+    if arvargs.update_workflow:
+        if arvargs.update_workflow.find('-7fd4e-') == 5:
+            want_api = 'containers'
+        elif arvargs.update_workflow.find('-p5p6p-') == 5:
+            want_api = 'jobs'
+        else:
+            want_api = None
+        if want_api and arvargs.work_api and want_api != arvargs.work_api:
+            logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
+                arvargs.update_workflow, want_api, arvargs.work_api))
+            return 1
+        arvargs.work_api = want_api
+
+    if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
         job_order_object = ({}, "")
 
         job_order_object = ({}, "")
 
+    add_arv_hints()
+
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
-        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
+        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
     except Exception as e:
         logger.error(e)
         return 1
 
     except Exception as e:
         logger.error(e)
         return 1
 
+    if arvargs.debug:
+        logger.setLevel(logging.DEBUG)
+
+    if arvargs.quiet:
+        logger.setLevel(logging.WARN)
+        logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
+
+    if arvargs.metrics:
+        metrics.setLevel(logging.DEBUG)
+        logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
+
     arvargs.conformance_test = None
     arvargs.use_container = True
     arvargs.conformance_test = None
     arvargs.use_container = True
+    arvargs.relax_path_checks = True
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
-                             executor=runner.arvExecutor,
-                             makeTool=runner.arvMakeTool,
+                             executor=runner.arv_executor,
+                             makeTool=runner.arv_make_tool,
                              versionfunc=versionstring,
                              job_order_object=job_order_object,
                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))
                              versionfunc=versionstring,
                              job_order_object=job_order_object,
                              make_fs_access=partial(CollectionFsAccess, api_client=api_client))