11948: if -> elif style fix
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index cacb7b81a0321e9f6da0726fffc04f8c16bee0eb..695597f839c1ceee5ccbb67f3391218d3d1a8e34 100644 (file)
@@ -1,4 +1,7 @@
 #!/usr/bin/env python
 #!/usr/bin/env python
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
 
 # Implement cwl-runner interface for submitting and running work on Arvados, using
 # either the Crunch jobs API or Crunch containers API.
 
 # Implement cwl-runner interface for submitting and running work on Arvados, using
 # either the Crunch jobs API or Crunch containers API.
@@ -11,31 +14,35 @@ import threading
 import hashlib
 import copy
 import json
 import hashlib
 import copy
 import json
+import re
 from functools import partial
 import pkg_resources  # part of setuptools
 
 from cwltool.errors import WorkflowException
 import cwltool.main
 import cwltool.workflow
 from functools import partial
 import pkg_resources  # part of setuptools
 
 from cwltool.errors import WorkflowException
 import cwltool.main
 import cwltool.workflow
+import cwltool.process
 import schema_salad
 import schema_salad
+from schema_salad.sourceline import SourceLine
 
 import arvados
 import arvados.config
 
 import arvados
 import arvados.config
+from arvados.keep import KeepClient
 from arvados.errors import ApiError
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from arvados.errors import ApiError
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_instance
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
 from .perf import Perf
 from .perf import Perf
-from .pathmapper import FinalOutputPathMapper
+from .pathmapper import NoFollowPathMapper
 from ._version import __version__
 
 from cwltool.pack import pack
 from ._version import __version__
 
 from cwltool.pack import pack
-from cwltool.process import shortname, UnsupportedRequirement, getListing
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
 from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
 from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
@@ -43,6 +50,9 @@ logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
 logger.setLevel(logging.INFO)
 
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
 logger.setLevel(logging.INFO)
 
+arvados.log_handler.setFormatter(logging.Formatter(
+        '%(asctime)s %(name)s %(levelname)s: %(message)s',
+        '%Y-%m-%d %H:%M:%S'))
 
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
 
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
@@ -50,7 +60,7 @@ class ArvCwlRunner(object):
 
     """
 
 
     """
 
-    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
+    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
         self.api = api_client
         self.processes = {}
         self.lock = threading.Lock()
         self.api = api_client
         self.processes = {}
         self.lock = threading.Lock()
@@ -58,7 +68,7 @@ class ArvCwlRunner(object):
         self.final_output = None
         self.final_status = None
         self.uploaded = {}
         self.final_output = None
         self.final_status = None
         self.uploaded = {}
-        self.num_retries = 4
+        self.num_retries = num_retries
         self.uuid = None
         self.stop_polling = threading.Event()
         self.poll_api = None
         self.uuid = None
         self.stop_polling = threading.Event()
         self.poll_api = None
@@ -67,12 +77,17 @@ class ArvCwlRunner(object):
         self.output_name = output_name
         self.output_tags = output_tags
         self.project_uuid = None
         self.output_name = output_name
         self.output_tags = output_tags
         self.project_uuid = None
+        self.intermediate_output_ttl = 0
+        self.intermediate_output_collections = []
+        self.trash_intermediate = False
 
         if keep_client is not None:
             self.keep_client = keep_client
         else:
             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
 
         if keep_client is not None:
             self.keep_client = keep_client
         else:
             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
+        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+
         self.work_api = None
         expected_api = ["jobs", "containers"]
         for api in expected_api:
         self.work_api = None
         expected_api = ["jobs", "containers"]
         for api in expected_api:
@@ -93,6 +108,11 @@ class ArvCwlRunner(object):
 
     def arv_make_tool(self, toolpath_object, **kwargs):
         kwargs["work_api"] = self.work_api
 
     def arv_make_tool(self, toolpath_object, **kwargs):
         kwargs["work_api"] = self.work_api
+        kwargs["fetcher_constructor"] = partial(CollectionFetcher,
+                                                api_client=self.api,
+                                                fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
+                                                num_retries=self.num_retries,
+                                                overrides=kwargs.get("override_tools"))
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
@@ -121,7 +141,7 @@ class ArvCwlRunner(object):
                     uuid = event["object_uuid"]
                     with self.lock:
                         j = self.processes[uuid]
                     uuid = event["object_uuid"]
                     with self.lock:
                         j = self.processes[uuid]
-                        logger.info("Job %s (%s) is Running", j.name, uuid)
+                        logger.info("%s %s is Running", self.label(j), uuid)
                         j.running = True
                         j.update_pipeline_component(event["properties"]["new_attributes"])
                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
                         j.running = True
                         j.update_pipeline_component(event["properties"]["new_attributes"])
                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
@@ -129,14 +149,16 @@ class ArvCwlRunner(object):
                     try:
                         self.cond.acquire()
                         j = self.processes[uuid]
                     try:
                         self.cond.acquire()
                         j = self.processes[uuid]
-                        txt = self.work_api[0].upper() + self.work_api[1:-1]
-                        logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
+                        logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
                         with Perf(metrics, "done %s" % j.name):
                             j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
                     finally:
                         self.cond.release()
 
                         with Perf(metrics, "done %s" % j.name):
                             j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
                     finally:
                         self.cond.release()
 
+    def label(self, obj):
+        return "[%s %s]" % (self.work_api[0:-1], obj.name)
+
     def poll_states(self):
         """Poll status of jobs or containers listed in the processes dict.
 
     def poll_states(self):
         """Poll status of jobs or containers listed in the processes dict.
 
@@ -187,15 +209,35 @@ class ArvCwlRunner(object):
     def add_uploaded(self, src, pair):
         self.uploaded[src] = pair
 
     def add_uploaded(self, src, pair):
         self.uploaded[src] = pair
 
-    def check_writable(self, obj):
+    def add_intermediate_output(self, uuid):
+        if uuid:
+            self.intermediate_output_collections.append(uuid)
+
+    def trash_intermediate_output(self):
+        logger.info("Cleaning up intermediate output collections")
+        for i in self.intermediate_output_collections:
+            try:
+                self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
+            except:
+                logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+            if sys.exc_info()[0] is KeyboardInterrupt:
+                break
+
+    def check_features(self, obj):
         if isinstance(obj, dict):
             if obj.get("writable"):
         if isinstance(obj, dict):
             if obj.get("writable"):
-                raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
+                raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
+            if obj.get("class") == "DockerRequirement":
+                if obj.get("dockerOutputDirectory"):
+                    # TODO: can be supported by containers API, but not jobs API.
+                    raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+                        "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
             for v in obj.itervalues():
             for v in obj.itervalues():
-                self.check_writable(v)
-        if isinstance(obj, list):
-            for v in obj:
-                self.check_writable(v)
+                self.check_features(v)
+        elif isinstance(obj, list):
+            for i,v in enumerate(obj):
+                with SourceLine(obj, i, UnsupportedRequirement):
+                    self.check_features(v)
 
     def make_output_collection(self, name, tagsString, outputObj):
         outputObj = copy.deepcopy(outputObj)
 
     def make_output_collection(self, name, tagsString, outputObj):
         outputObj = copy.deepcopy(outputObj)
@@ -207,13 +249,12 @@ class ArvCwlRunner(object):
         adjustDirObjs(outputObj, capture)
         adjustFileObjs(outputObj, capture)
 
         adjustDirObjs(outputObj, capture)
         adjustFileObjs(outputObj, capture)
 
-        generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
+        generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
 
         final = arvados.collection.Collection(api_client=self.api,
                                               keep_client=self.keep_client,
                                               num_retries=self.num_retries)
 
 
         final = arvados.collection.Collection(api_client=self.api,
                                               keep_client=self.keep_client,
                                               num_retries=self.num_retries)
 
-        srccollections = {}
         for k,v in generatemapper.items():
             if k.startswith("_:"):
                 if v.type == "Directory":
         for k,v in generatemapper.items():
             if k.startswith("_:"):
                 if v.type == "Directory":
@@ -227,20 +268,13 @@ class ArvCwlRunner(object):
                 raise Exception("Output source is not in keep or a literal")
             sp = k.split("/")
             srccollection = sp[0][5:]
                 raise Exception("Output source is not in keep or a literal")
             sp = k.split("/")
             srccollection = sp[0][5:]
-            if srccollection not in srccollections:
-                try:
-                    srccollections[srccollection] = arvados.collection.CollectionReader(
-                        srccollection,
-                        api_client=self.api,
-                        keep_client=self.keep_client,
-                        num_retries=self.num_retries)
-                except arvados.errors.ArgumentError as e:
-                    logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
-                    raise
-            reader = srccollections[srccollection]
             try:
             try:
+                reader = self.collection_cache.get(srccollection)
                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
+            except arvados.errors.ArgumentError as e:
+                logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+                raise
             except IOError as e:
                 logger.warn("While preparing output collection: %s", e)
 
             except IOError as e:
                 logger.warn("While preparing output collection: %s", e)
 
@@ -291,6 +325,10 @@ class ArvCwlRunner(object):
                                              body={
                                                  'output': self.final_output_collection.portable_data_hash(),
                                              }).execute(num_retries=self.num_retries)
                                              body={
                                                  'output': self.final_output_collection.portable_data_hash(),
                                              }).execute(num_retries=self.num_retries)
+                self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
+                                              body={
+                                                  'is_trashed': True
+                                              }).execute(num_retries=self.num_retries)
             except Exception as e:
                 logger.info("Setting container output: %s", e)
         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
             except Exception as e:
                 logger.info("Setting container output: %s", e)
         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
@@ -304,30 +342,65 @@ class ArvCwlRunner(object):
     def arv_executor(self, tool, job_order, **kwargs):
         self.debug = kwargs.get("debug")
 
     def arv_executor(self, tool, job_order, **kwargs):
         self.debug = kwargs.get("debug")
 
-        tool.visit(self.check_writable)
+        tool.visit(self.check_features)
 
         self.project_uuid = kwargs.get("project_uuid")
         self.pipeline = None
         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
 
         self.project_uuid = kwargs.get("project_uuid")
         self.pipeline = None
         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
-                                                                 api_client=self.api,
-                                                                 keep_client=self.keep_client)
+                                                                 collection_cache=self.collection_cache)
         self.fs_access = make_fs_access(kwargs["basedir"])
 
         self.fs_access = make_fs_access(kwargs["basedir"])
 
+
+        self.trash_intermediate = kwargs["trash_intermediate"]
+        if self.trash_intermediate and self.work_api != "containers":
+            raise Exception("--trash-intermediate is only supported with --api=containers.")
+
+        self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
+        if self.intermediate_output_ttl and self.work_api != "containers":
+            raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
+        if self.intermediate_output_ttl < 0:
+            raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
+
+        if not kwargs.get("name"):
+            kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+
+        # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
+        # Also uploads docker images.
+        override_tools = {}
+        upload_workflow_deps(self, tool, override_tools)
+
+        # Reload tool object which may have been updated by
+        # upload_workflow_deps
+        tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
+                                  makeTool=self.arv_make_tool,
+                                  loader=tool.doc_loader,
+                                  avsc_names=tool.doc_schema,
+                                  metadata=tool.metadata,
+                                  override_tools=override_tools)
+
+        # Upload local file references in the job order.
+        job_order = upload_job_order(self, "%s input" % kwargs["name"],
+                                     tool, job_order)
+
         existing_uuid = kwargs.get("update_workflow")
         if existing_uuid or kwargs.get("create_workflow"):
         existing_uuid = kwargs.get("update_workflow")
         if existing_uuid or kwargs.get("create_workflow"):
+            # Create a pipeline template or workflow record and exit.
             if self.work_api == "jobs":
                 tmpl = RunnerTemplate(self, tool, job_order,
                                       kwargs.get("enable_reuse"),
                                       uuid=existing_uuid,
             if self.work_api == "jobs":
                 tmpl = RunnerTemplate(self, tool, job_order,
                                       kwargs.get("enable_reuse"),
                                       uuid=existing_uuid,
-                                      submit_runner_ram=kwargs.get("submit_runner_ram"))
+                                      submit_runner_ram=kwargs.get("submit_runner_ram"),
+                                      name=kwargs["name"])
                 tmpl.save()
                 # cwltool.main will write our return value to stdout.
                 tmpl.save()
                 # cwltool.main will write our return value to stdout.
-                return tmpl.uuid
-            else:
-                return upload_workflow(self, tool, job_order,
-                                       self.project_uuid,
-                                       uuid=existing_uuid,
-                                       submit_runner_ram=kwargs.get("submit_runner_ram"))
+                return (tmpl.uuid, "success")
+            elif self.work_api == "containers":
+                return (upload_workflow(self, tool, job_order,
+                                        self.project_uuid,
+                                        uuid=existing_uuid,
+                                        submit_runner_ram=kwargs.get("submit_runner_ram"),
+                                        name=kwargs["name"]),
+                        "success")
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
@@ -335,7 +408,6 @@ class ArvCwlRunner(object):
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
         kwargs["use_container"] = True
         kwargs["tmpdir_prefix"] = "tmp"
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
         kwargs["use_container"] = True
         kwargs["tmpdir_prefix"] = "tmp"
-        kwargs["on_error"] = "continue"
         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
 
         if self.work_api == "containers":
         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
 
         if self.work_api == "containers":
@@ -348,35 +420,45 @@ class ArvCwlRunner(object):
             kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
             kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
-        upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
-
         runnerjob = None
         if kwargs.get("submit"):
         runnerjob = None
         if kwargs.get("submit"):
+            # Submit a runner job to run the workflow for us.
             if self.work_api == "containers":
             if self.work_api == "containers":
-                if tool.tool["class"] == "CommandLineTool":
+                if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
+                    kwargs["runnerjob"] = tool.tool["id"]
                     runnerjob = tool.job(job_order,
                                          self.output_callback,
                                          **kwargs).next()
                 else:
                     runnerjob = tool.job(job_order,
                                          self.output_callback,
                                          **kwargs).next()
                 else:
-                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
-                                                self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
-            else:
-                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
-                                      self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
-
-        if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
+                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
+                                                self.output_name,
+                                                self.output_tags,
+                                                submit_runner_ram=kwargs.get("submit_runner_ram"),
+                                                name=kwargs.get("name"),
+                                                on_error=kwargs.get("on_error"),
+                                                submit_runner_image=kwargs.get("submit_runner_image"),
+                                                intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
+            elif self.work_api == "jobs":
+                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
+                                      self.output_name,
+                                      self.output_tags,
+                                      submit_runner_ram=kwargs.get("submit_runner_ram"),
+                                      name=kwargs.get("name"),
+                                      on_error=kwargs.get("on_error"),
+                                      submit_runner_image=kwargs.get("submit_runner_image"))
+        elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
                 body={
                     "owner_uuid": self.project_uuid,
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
                 body={
                     "owner_uuid": self.project_uuid,
-                    "name": shortname(tool.tool["id"]),
+                    "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
                     "components": {},
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
         if runnerjob and not kwargs.get("wait"):
             runnerjob.run(wait=kwargs.get("wait"))
                     "components": {},
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
         if runnerjob and not kwargs.get("wait"):
             runnerjob.run(wait=kwargs.get("wait"))
-            return runnerjob.uuid
+            return (runnerjob.uuid, "success")
 
         self.poll_api = arvados.api('v1')
         self.polling_thread = threading.Thread(target=self.poll_states)
 
         self.poll_api = arvados.api('v1')
         self.polling_thread = threading.Thread(target=self.poll_states)
@@ -454,14 +536,14 @@ class ArvCwlRunner(object):
             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
             self.set_crunch_output()
 
             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
             self.set_crunch_output()
 
-        if self.final_status != "success":
-            raise WorkflowException("Workflow failed.")
-
         if kwargs.get("compute_checksum"):
         if kwargs.get("compute_checksum"):
-            adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
+            adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
-        return self.final_output
+        if self.trash_intermediate and self.final_status == "success":
+            self.trash_intermediate_output()
+
+        return (self.final_output, self.final_status)
 
 
 def versionstring():
 
 
 def versionstring():
@@ -488,7 +570,12 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
                         type=float,
                         default=20)
                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
                         type=float,
                         default=20)
-    parser.add_argument("--version", action="store_true", help="Print version and exit")
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--print-dot", action="store_true",
+                         help="Print workflow visualization in graphviz format and exit")
+    exgroup.add_argument("--version", action="store_true", help="Print version and exit")
+    exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
@@ -502,10 +589,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--enable-reuse", action="store_true",
                         default=True, dest="enable_reuse",
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--enable-reuse", action="store_true",
                         default=True, dest="enable_reuse",
-                        help="")
+                        help="Enable job or container reuse (default)")
     exgroup.add_argument("--disable-reuse", action="store_false",
                         default=True, dest="enable_reuse",
     exgroup.add_argument("--disable-reuse", action="store_false",
                         default=True, dest="enable_reuse",
-                        help="")
+                        help="Disable job or container reuse")
 
     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
 
     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
@@ -530,9 +617,16 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
                         default=True, dest="wait")
 
     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
                         default=True, dest="wait")
 
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
+                        default=True, dest="log_timestamps")
+    exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
+                        default=True, dest="log_timestamps")
+
     parser.add_argument("--api", type=str,
                         default=None, dest="work_api",
     parser.add_argument("--api", type=str,
                         default=None, dest="work_api",
-                        help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
+                        choices=("jobs", "containers"),
+                        help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
 
     parser.add_argument("--compute-checksum", action="store_true", default=False,
                         help="Compute checksum of contents while collecting outputs",
 
     parser.add_argument("--compute-checksum", action="store_true", default=False,
                         help="Compute checksum of contents while collecting outputs",
@@ -542,22 +636,55 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
                         default=1024)
 
                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
                         default=1024)
 
+    parser.add_argument("--submit-runner-image", type=str,
+                        help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
+                        default=None)
+
+    parser.add_argument("--name", type=str,
+                        help="Name to use for workflow execution instance.",
+                        default=None)
+
+    parser.add_argument("--on-error", type=str,
+                        help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
+                        "Default is 'continue'.", default="continue", choices=("stop", "continue"))
+
+    parser.add_argument("--enable-dev", action="store_true",
+                        help="Enable loading and running development versions "
+                             "of CWL spec.", default=False)
+
+    parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
+                        help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
+                        default=0)
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--trash-intermediate", action="store_true",
+                        default=False, dest="trash_intermediate",
+                         help="Immediately trash intermediate outputs on workflow success.")
+    exgroup.add_argument("--no-trash-intermediate", action="store_false",
+                        default=False, dest="trash_intermediate",
+                        help="Do not trash intermediate outputs (default).")
+
     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
     return parser
 
 def add_arv_hints():
     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
     return parser
 
 def add_arv_hints():
-    cache = {}
+    cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
+    cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
-    cache["http://arvados.org/cwl"] = res.read()
+    use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
     res.close()
     res.close()
-    document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
-    _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
-    for n in extnames.names:
-        if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
-            cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
-        document_loader.idx["http://arvados.org/cwl#"+n] = {}
+    cwltool.process.supportedProcessRequirements.extend([
+        "http://arvados.org/cwl#RunInSingleContainer",
+        "http://arvados.org/cwl#OutputDirType",
+        "http://arvados.org/cwl#RuntimeConstraints",
+        "http://arvados.org/cwl#PartitionRequirement",
+        "http://arvados.org/cwl#APIRequirement",
+        "http://commonwl.org/cwltool#LoadListingRequirement",
+        "http://arvados.org/cwl#IntermediateOutput",
+        "http://arvados.org/cwl#ReuseRequirement"
+    ])
 
 def main(args, stdout, stderr, api_client=None, keep_client=None):
     parser = arg_parser()
 
 def main(args, stdout, stderr, api_client=None, keep_client=None):
     parser = arg_parser()
@@ -565,6 +692,10 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     job_order_object = None
     arvargs = parser.parse_args(args)
 
     job_order_object = None
     arvargs = parser.parse_args(args)
 
+    if arvargs.version:
+        print versionstring()
+        return
+
     if arvargs.update_workflow:
         if arvargs.update_workflow.find('-7fd4e-') == 5:
             want_api = 'containers'
     if arvargs.update_workflow:
         if arvargs.update_workflow.find('-7fd4e-') == 5:
             want_api = 'containers'
@@ -586,25 +717,43 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
-        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
+        if keep_client is None:
+            keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
+        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
+                              num_retries=4, output_name=arvargs.output_name,
+                              output_tags=arvargs.output_tags)
     except Exception as e:
         logger.error(e)
         return 1
 
     if arvargs.debug:
         logger.setLevel(logging.DEBUG)
     except Exception as e:
         logger.error(e)
         return 1
 
     if arvargs.debug:
         logger.setLevel(logging.DEBUG)
+        logging.getLogger('arvados').setLevel(logging.DEBUG)
 
     if arvargs.quiet:
         logger.setLevel(logging.WARN)
 
     if arvargs.quiet:
         logger.setLevel(logging.WARN)
+        logging.getLogger('arvados').setLevel(logging.WARN)
         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
     if arvargs.metrics:
         metrics.setLevel(logging.DEBUG)
         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
 
         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
     if arvargs.metrics:
         metrics.setLevel(logging.DEBUG)
         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
 
+    if arvargs.log_timestamps:
+        arvados.log_handler.setFormatter(logging.Formatter(
+            '%(asctime)s %(name)s %(levelname)s: %(message)s',
+            '%Y-%m-%d %H:%M:%S'))
+    else:
+        arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
+
     arvargs.conformance_test = None
     arvargs.use_container = True
     arvargs.relax_path_checks = True
     arvargs.conformance_test = None
     arvargs.use_container = True
     arvargs.relax_path_checks = True
+    arvargs.validate = None
+    arvargs.print_supported_versions = False
+
+    make_fs_access = partial(CollectionFsAccess,
+                           collection_cache=runner.collection_cache)
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
@@ -613,4 +762,11 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
                              makeTool=runner.arv_make_tool,
                              versionfunc=versionstring,
                              job_order_object=job_order_object,
                              makeTool=runner.arv_make_tool,
                              versionfunc=versionstring,
                              job_order_object=job_order_object,
-                             make_fs_access=partial(CollectionFsAccess, api_client=api_client))
+                             make_fs_access=make_fs_access,
+                             fetcher_constructor=partial(CollectionFetcher,
+                                                         api_client=api_client,
+                                                         fs_access=make_fs_access(""),
+                                                         num_retries=runner.num_retries),
+                             resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
+                             logger_handler=arvados.log_handler,
+                             custom_schema_callback=add_arv_hints)