13108: Refactor task queue into its own class.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 4701b4d8f13a29a2c1dc8f3bc5558de788a5a1fa..7affade0734536fb3e7ee241bed9357a995eb949 100644 (file)
@@ -17,12 +17,15 @@ import json
 import re
 from functools import partial
 import pkg_resources  # part of setuptools
 import re
 from functools import partial
 import pkg_resources  # part of setuptools
+import Queue
+import time
 
 from cwltool.errors import WorkflowException
 import cwltool.main
 import cwltool.workflow
 import cwltool.process
 from schema_salad.sourceline import SourceLine
 
 from cwltool.errors import WorkflowException
 import cwltool.main
 import cwltool.workflow
 import cwltool.process
 from schema_salad.sourceline import SourceLine
+import schema_salad.validate as validate
 
 import arvados
 import arvados.config
 
 import arvados
 import arvados.config
@@ -37,12 +40,13 @@ from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
+from .task_queue import TaskQueue
 from ._version import __version__
 
 from cwltool.pack import pack
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
 from ._version import __version__
 
 from cwltool.pack import pack
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
-from cwltool.draft2tool import compute_checksums
+from cwltool.command_line_tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -53,17 +57,19 @@ arvados.log_handler.setFormatter(logging.Formatter(
         '%(asctime)s %(name)s %(levelname)s: %(message)s',
         '%Y-%m-%d %H:%M:%S'))
 
         '%(asctime)s %(name)s %(levelname)s: %(message)s',
         '%Y-%m-%d %H:%M:%S'))
 
+DEFAULT_PRIORITY = 500
+
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
     containers API), wait for them to complete, and report output.
 
     """
 
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
     containers API), wait for them to complete, and report output.
 
     """
 
-    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
+    def __init__(self, api_client, work_api=None, keep_client=None,
+                 output_name=None, output_tags=None, num_retries=4):
         self.api = api_client
         self.processes = {}
         self.api = api_client
         self.processes = {}
-        self.lock = threading.Lock()
-        self.cond = threading.Condition(self.lock)
+        self.workflow_eval_lock = threading.Condition(threading.RLock())
         self.final_output = None
         self.final_status = None
         self.uploaded = {}
         self.final_output = None
         self.final_status = None
         self.uploaded = {}
@@ -79,6 +85,8 @@ class ArvCwlRunner(object):
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
+        self.thread_count = 4
+        self.poll_interval = 12
 
         if keep_client is not None:
             self.keep_client = keep_client
 
         if keep_client is not None:
             self.keep_client = keep_client
@@ -120,40 +128,58 @@ class ArvCwlRunner(object):
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
     def output_callback(self, out, processStatus):
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
     def output_callback(self, out, processStatus):
-        if processStatus == "success":
-            logger.info("Overall process status is %s", processStatus)
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Complete"}).execute(num_retries=self.num_retries)
-        else:
-            logger.warn("Overall process status is %s", processStatus)
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
-        self.final_status = processStatus
-        self.final_output = out
+        with self.workflow_eval_lock:
+            if processStatus == "success":
+                logger.info("Overall process status is %s", processStatus)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Complete"}).execute(num_retries=self.num_retries)
+            else:
+                logger.warn("Overall process status is %s", processStatus)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            self.final_status = processStatus
+            self.final_output = out
+            self.workflow_eval_lock.notifyAll()
+
+
+    def start_run(self, runnable, kwargs):
+        self.task_queue.add(partial(runnable.run, **kwargs))
+
+    def process_submitted(self, container):
+        with self.workflow_eval_lock:
+            self.processes[container.uuid] = container
+
+    def process_done(self, uuid):
+        with self.workflow_eval_lock:
+            if uuid in self.processes:
+                del self.processes[uuid]
+
+    def wrapped_callback(self, cb, obj, st):
+        with self.workflow_eval_lock:
+            cb(obj, st)
+            self.workflow_eval_lock.notifyAll()
+
+    def get_wrapped_callback(self, cb):
+        return partial(self.wrapped_callback, cb)
 
     def on_message(self, event):
         if "object_uuid" in event:
             if event["object_uuid"] in self.processes and event["event_type"] == "update":
 
     def on_message(self, event):
         if "object_uuid" in event:
             if event["object_uuid"] in self.processes and event["event_type"] == "update":
-                if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
-                    uuid = event["object_uuid"]
-                    with self.lock:
+                uuid = event["object_uuid"]
+                if event["properties"]["new_attributes"]["state"] == "Running":
+                    with self.workflow_eval_lock:
                         j = self.processes[uuid]
                         j = self.processes[uuid]
-                        logger.info("%s %s is Running", self.label(j), uuid)
-                        j.running = True
-                        j.update_pipeline_component(event["properties"]["new_attributes"])
+                        if j.running is False:
+                            j.running = True
+                            j.update_pipeline_component(event["properties"]["new_attributes"])
+                            logger.info("%s %s is Running", self.label(j), uuid)
                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
-                    uuid = event["object_uuid"]
-                    try:
-                        self.cond.acquire()
+                    with self.workflow_eval_lock:
                         j = self.processes[uuid]
                         j = self.processes[uuid]
-                        logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
-                        with Perf(metrics, "done %s" % j.name):
-                            j.done(event["properties"]["new_attributes"])
-                        self.cond.notify()
-                    finally:
-                        self.cond.release()
+                    self.task_queue.add(partial(j.done, event["properties"]["new_attributes"]))
+                    logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
 
     def label(self, obj):
         return "[%s %s]" % (self.work_api[0:-1], obj.name)
 
     def label(self, obj):
         return "[%s %s]" % (self.work_api[0:-1], obj.name)
@@ -165,15 +191,19 @@ class ArvCwlRunner(object):
         """
 
         try:
         """
 
         try:
+            remain_wait = self.poll_interval
             while True:
             while True:
-                self.stop_polling.wait(15)
+                if remain_wait > 0:
+                    self.stop_polling.wait(remain_wait)
                 if self.stop_polling.is_set():
                     break
                 if self.stop_polling.is_set():
                     break
-                with self.lock:
-                    keys = self.processes.keys()
+                with self.workflow_eval_lock:
+                    keys = list(self.processes.keys())
                 if not keys:
                 if not keys:
+                    remain_wait = self.poll_interval
                     continue
 
                     continue
 
+                begin_poll = time.time()
                 if self.work_api == "containers":
                     table = self.poll_api.container_requests()
                 elif self.work_api == "jobs":
                 if self.work_api == "containers":
                     table = self.poll_api.container_requests()
                 elif self.work_api == "jobs":
@@ -183,6 +213,7 @@ class ArvCwlRunner(object):
                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
                 except Exception as e:
                     logger.warn("Error checking states on API server: %s", e)
                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
                 except Exception as e:
                     logger.warn("Error checking states on API server: %s", e)
+                    remain_wait = self.poll_interval
                     continue
 
                 for p in proc_states["items"]:
                     continue
 
                 for p in proc_states["items"]:
@@ -193,12 +224,13 @@ class ArvCwlRunner(object):
                             "new_attributes": p
                         }
                     })
                             "new_attributes": p
                         }
                     })
+                finish_poll = time.time()
+                remain_wait = self.poll_interval - (finish_poll - begin_poll)
         except:
         except:
-            logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
-            self.cond.acquire()
-            self.processes.clear()
-            self.cond.notify()
-            self.cond.release()
+            logger.exception("Fatal error in state polling thread.")
+            with self.workflow_eval_lock:
+                self.processes.clear()
+                self.workflow_eval_lock.notifyAll()
         finally:
             self.stop_polling.set()
 
         finally:
             self.stop_polling.set()
 
@@ -234,6 +266,8 @@ class ArvCwlRunner(object):
                     if not obj.get("dockerOutputDirectory").startswith('/'):
                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
                             "Option 'dockerOutputDirectory' must be an absolute path.")
                     if not obj.get("dockerOutputDirectory").startswith('/'):
                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
                             "Option 'dockerOutputDirectory' must be an absolute path.")
+            if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
+                raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
             for v in obj.itervalues():
                 self.check_features(v)
         elif isinstance(obj, list):
             for v in obj.itervalues():
                 self.check_features(v)
         elif isinstance(obj, list):
@@ -351,7 +385,8 @@ class ArvCwlRunner(object):
         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
                                                                  collection_cache=self.collection_cache)
         self.fs_access = make_fs_access(kwargs["basedir"])
         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
                                                                  collection_cache=self.collection_cache)
         self.fs_access = make_fs_access(kwargs["basedir"])
-
+        self.secret_store = kwargs.get("secret_store")
+        self.thread_count = kwargs.get("thread_count", 4)
 
         self.trash_intermediate = kwargs["trash_intermediate"]
         if self.trash_intermediate and self.work_api != "containers":
 
         self.trash_intermediate = kwargs["trash_intermediate"]
         if self.trash_intermediate and self.work_api != "containers":
@@ -372,11 +407,13 @@ class ArvCwlRunner(object):
 
         # Reload tool object which may have been updated by
         # upload_workflow_deps
 
         # Reload tool object which may have been updated by
         # upload_workflow_deps
+        # Don't validate this time because it will just print redundant errors.
         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
                                   makeTool=self.arv_make_tool,
                                   loader=tool.doc_loader,
                                   avsc_names=tool.doc_schema,
         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
                                   makeTool=self.arv_make_tool,
                                   loader=tool.doc_loader,
                                   avsc_names=tool.doc_schema,
-                                  metadata=tool.metadata)
+                                  metadata=tool.metadata,
+                                  do_validate=False)
 
         # Upload local file references in the job order.
         job_order = upload_job_order(self, "%s input" % kwargs["name"],
 
         # Upload local file references in the job order.
         job_order = upload_job_order(self, "%s input" % kwargs["name"],
@@ -415,16 +452,21 @@ class ArvCwlRunner(object):
 
         if self.work_api == "containers":
             if self.ignore_docker_for_reuse:
 
         if self.work_api == "containers":
             if self.ignore_docker_for_reuse:
-                raise validate.ValidationException("--ignore-docker-for-reuse not supported with containers API.")
+                raise Exception("--ignore-docker-for-reuse not supported with containers API.")
             kwargs["outdir"] = "/var/spool/cwl"
             kwargs["docker_outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
             kwargs["docker_tmpdir"] = "/tmp"
         elif self.work_api == "jobs":
             kwargs["outdir"] = "/var/spool/cwl"
             kwargs["docker_outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
             kwargs["docker_tmpdir"] = "/tmp"
         elif self.work_api == "jobs":
+            if kwargs["priority"] != DEFAULT_PRIORITY:
+                raise Exception("--priority not implemented for jobs API.")
             kwargs["outdir"] = "$(task.outdir)"
             kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
             kwargs["outdir"] = "$(task.outdir)"
             kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
+        if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
+            raise Exception("--priority must be in the range 1..1000.")
+
         runnerjob = None
         if kwargs.get("submit"):
             # Submit a runner job to run the workflow for us.
         runnerjob = None
         if kwargs.get("submit"):
             # Submit a runner job to run the workflow for us.
@@ -443,7 +485,9 @@ class ArvCwlRunner(object):
                                                 on_error=kwargs.get("on_error"),
                                                 submit_runner_image=kwargs.get("submit_runner_image"),
                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
                                                 on_error=kwargs.get("on_error"),
                                                 submit_runner_image=kwargs.get("submit_runner_image"),
                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
-                                                merged_map=merged_map)
+                                                merged_map=merged_map,
+                                                priority=kwargs.get("priority"),
+                                                secret_store=self.secret_store)
             elif self.work_api == "jobs":
                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
                                       self.output_name,
             elif self.work_api == "jobs":
                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
                                       self.output_name,
@@ -464,13 +508,15 @@ class ArvCwlRunner(object):
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
         if runnerjob and not kwargs.get("wait"):
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
         if runnerjob and not kwargs.get("wait"):
-            runnerjob.run(wait=kwargs.get("wait"))
+            runnerjob.run(**kwargs)
             return (runnerjob.uuid, "success")
 
         self.poll_api = arvados.api('v1')
         self.polling_thread = threading.Thread(target=self.poll_states)
         self.polling_thread.start()
 
             return (runnerjob.uuid, "success")
 
         self.poll_api = arvados.api('v1')
         self.polling_thread = threading.Thread(target=self.poll_states)
         self.polling_thread.start()
 
+        self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
+
         if runnerjob:
             jobiter = iter((runnerjob,))
         else:
         if runnerjob:
             jobiter = iter((runnerjob,))
         else:
@@ -481,10 +527,11 @@ class ArvCwlRunner(object):
                                **kwargs)
 
         try:
                                **kwargs)
 
         try:
-            self.cond.acquire()
-            # Will continue to hold the lock for the duration of this code
-            # except when in cond.wait(), at which point on_message can update
-            # job state and process output callbacks.
+            self.workflow_eval_lock.acquire()
+            # Holds the lock while this code runs and releases it when
+            # it is safe to do so in self.workflow_eval_lock.wait(),
+            # at which point on_message can update job state and
+            # process output callbacks.
 
             loopperf = Perf(metrics, "jobiter")
             loopperf.__enter__()
 
             loopperf = Perf(metrics, "jobiter")
             loopperf.__enter__()
@@ -494,20 +541,25 @@ class ArvCwlRunner(object):
                 if self.stop_polling.is_set():
                     break
 
                 if self.stop_polling.is_set():
                     break
 
+                if self.task_queue.error is not None:
+                    raise self.task_queue.error
+
                 if runnable:
                     with Perf(metrics, "run"):
                 if runnable:
                     with Perf(metrics, "run"):
-                        runnable.run(**kwargs)
+                        self.start_run(runnable, kwargs)
                 else:
                 else:
-                    if self.processes:
-                        self.cond.wait(1)
+                    if (self.task_queue.in_flight + len(self.processes)) > 0:
+                        self.workflow_eval_lock.wait(3)
                     else:
                     else:
-                        logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+                        logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pendingjobs.")
                         break
                 loopperf.__enter__()
             loopperf.__exit__()
 
                         break
                 loopperf.__enter__()
             loopperf.__exit__()
 
-            while self.processes:
-                self.cond.wait(1)
+            while (self.task_queue.in_flight + len(self.processes)) > 0:
+                if self.task_queue.error is not None:
+                    raise self.task_queue.error
+                self.workflow_eval_lock.wait(3)
 
         except UnsupportedRequirement:
             raise
 
         except UnsupportedRequirement:
             raise
@@ -523,9 +575,11 @@ class ArvCwlRunner(object):
                 self.api.container_requests().update(uuid=runnerjob.uuid,
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
                 self.api.container_requests().update(uuid=runnerjob.uuid,
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
-            self.cond.release()
+            self.workflow_eval_lock.release()
+            self.task_queue.drain()
             self.stop_polling.set()
             self.polling_thread.join()
             self.stop_polling.set()
             self.polling_thread.join()
+            self.task_queue.join()
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
@@ -581,7 +635,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--print-dot", action="store_true",
                          help="Print workflow visualization in graphviz format and exit")
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--print-dot", action="store_true",
                          help="Print workflow visualization in graphviz format and exit")
-    exgroup.add_argument("--version", action="store_true", help="Print version and exit")
+    exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
 
     exgroup = parser.add_mutually_exclusive_group()
@@ -663,6 +717,21 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
                         default=0)
 
                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
                         default=0)
 
+    parser.add_argument("--priority", type=int,
+                        help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
+                        default=DEFAULT_PRIORITY)
+
+    parser.add_argument("--disable-validate", dest="do_validate",
+                        action="store_false", default=True,
+                        help=argparse.SUPPRESS)
+
+    parser.add_argument("--disable-js-validation",
+                        action="store_true", default=False,
+                        help=argparse.SUPPRESS)
+
+    parser.add_argument("--thread-count", type=int,
+                        default=4, help="Number of threads to use for job submit and output collection.")
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--trash-intermediate", action="store_true",
                         default=False, dest="trash_intermediate",
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--trash-intermediate", action="store_true",
                         default=False, dest="trash_intermediate",
@@ -671,14 +740,14 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=False, dest="trash_intermediate",
                         help="Do not trash intermediate outputs (default).")
 
                         default=False, dest="trash_intermediate",
                         help="Do not trash intermediate outputs (default).")
 
-    parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
+    parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
     return parser
 
 def add_arv_hints():
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
     return parser
 
 def add_arv_hints():
-    cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
-    cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
+    cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
+    cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
     res.close()
     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
     res.close()
@@ -699,10 +768,6 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     job_order_object = None
     arvargs = parser.parse_args(args)
 
     job_order_object = None
     arvargs = parser.parse_args(args)
 
-    if arvargs.version:
-        print versionstring()
-        return
-
     if arvargs.update_workflow:
         if arvargs.update_workflow.find('-7fd4e-') == 5:
             want_api = 'containers'
     if arvargs.update_workflow:
         if arvargs.update_workflow.find('-7fd4e-') == 5:
             want_api = 'containers'
@@ -723,7 +788,8 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     try:
         if api_client is None:
 
     try:
         if api_client is None:
-            api_client=arvados.api('v1', model=OrderedJsonModel())
+            api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
+            keep_client = api_client.keep
         if keep_client is None:
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
         if keep_client is None:
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,