From a82b39e1849a67854cb3399a4660b77548edd580 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 6 Apr 2018 10:29:30 -0400 Subject: [PATCH] 13108: Rename parallel_submit_count to thread_count Make sure it gets passed through to RunnerContainer. Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- build/build-dev-docker-jobs-image.sh | 6 ++---- sdk/cwl/arvados_cwl/__init__.py | 14 +++++++------- sdk/cwl/arvados_cwl/arvcontainer.py | 24 ++++++++++++++---------- sdk/cwl/arvados_cwl/arvjob.py | 4 ++-- 4 files changed, 25 insertions(+), 23 deletions(-) diff --git a/build/build-dev-docker-jobs-image.sh b/build/build-dev-docker-jobs-image.sh index e1e5063f73..9393c1acce 100755 --- a/build/build-dev-docker-jobs-image.sh +++ b/build/build-dev-docker-jobs-image.sh @@ -22,10 +22,8 @@ EOF set -e if [[ -z "$WORKSPACE" ]] ; then - echo "$helpmessage" - echo - echo "Must set WORKSPACE" - exit 1 + export WORKSPACE=$(readlink -f $(dirname $0)/..) + echo "Using WORKSPACE $WORKSPACE" fi if [[ -z "$ARVADOS_API_HOST" || -z "$ARVADOS_API_TOKEN" ]] ; then diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index f70fa65fb9..0f916ee485 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -66,7 +66,7 @@ class ArvCwlRunner(object): def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4, - parallel_submit_count=4): + thread_count=4): self.api = api_client self.processes = {} self.in_flight = 0 @@ -88,7 +88,7 @@ class ArvCwlRunner(object): self.trash_intermediate = False self.task_queue = Queue.Queue() self.task_queue_threads = [] - self.parallel_submit_count = parallel_submit_count + self.thread_count = thread_count self.poll_interval = 12 if keep_client is not None: @@ -153,7 +153,7 @@ class ArvCwlRunner(object): task() def task_queue_add(self, task): - if self.parallel_submit_count > 1: + if self.thread_count > 1: self.task_queue.put(task) else: task() @@ -528,14 +528,14 @@ class ArvCwlRunner(object): logger.info("Pipeline instance %s", self.pipeline["uuid"]) if runnerjob and not kwargs.get("wait"): - runnerjob.run(wait=kwargs.get("wait")) + runnerjob.run(**kwargs) return (runnerjob.uuid, "success") self.poll_api = arvados.api('v1') self.polling_thread = threading.Thread(target=self.poll_states) self.polling_thread.start() - for r in xrange(0, self.parallel_submit_count): + for r in xrange(0, self.thread_count): t = threading.Thread(target=self.task_queue_func) self.task_queue_threads.append(t) t.start() @@ -755,8 +755,8 @@ def arg_parser(): # type: () -> argparse.ArgumentParser action="store_true", default=False, help=argparse.SUPPRESS) - parser.add_argument("--parallel-submit-count", type=int, - default=4, help="Submit requests in parallel (default 4)") + parser.add_argument("--thread-count", type=int, + default=4, help="Number of threads to use for job submit and output collection.") exgroup = parser.add_mutually_exclusive_group() exgroup.add_argument("--trash-intermediate", action="store_true", diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index eb333d05ae..afcf2db6a0 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -400,7 +400,18 @@ class RunnerContainer(Runner): # --api=containers means use the containers API # --no-log-timestamps means don't add timestamps (the logging infrastructure does this) # --disable-validate because we already validated so don't need to do it again - command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps", "--disable-validate"] + # --eval-timeout is the timeout for javascript invocation + # --parallel-task-count is the number of threads to use for job submission + # --enable/disable-reuse sets desired job reuse + command = ["arvados-cwl-runner", + "--local", + "--api=containers", + "--no-log-timestamps", + "--disable-validate", + "--eval-timeout=%s" % self.arvrunner.eval_timeout, + "--thread-count=%s" % self.arvrunner.thread_count, + "--enable-reuse" if self.enable_reuse else "--disable-reuse"] + if self.output_name: command.append("--output-name=" + self.output_name) container_req["output_name"] = self.output_name @@ -411,11 +422,6 @@ class RunnerContainer(Runner): if kwargs.get("debug"): command.append("--debug") - if self.enable_reuse: - command.append("--enable-reuse") - else: - command.append("--disable-reuse") - if self.on_error: command.append("--on-error=" + self.on_error) @@ -428,8 +434,6 @@ class RunnerContainer(Runner): if self.arvrunner.project_uuid: command.append("--project-uuid="+self.arvrunner.project_uuid) - command.append("--eval-timeout=%s" % self.arvrunner.eval_timeout) - command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"]) container_req["command"] = command @@ -437,9 +441,9 @@ class RunnerContainer(Runner): return container_req - def run(self, *args, **kwargs): + def run(self, **kwargs): kwargs["keepprefix"] = "keep:" - job_spec = self.arvados_job_spec(*args, **kwargs) + job_spec = self.arvados_job_spec(**kwargs) job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) response = self.arvrunner.api.container_requests().create( diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py index e2df831c46..e222152a16 100644 --- a/sdk/cwl/arvados_cwl/arvjob.py +++ b/sdk/cwl/arvados_cwl/arvjob.py @@ -317,8 +317,8 @@ class RunnerJob(Runner): } } - def run(self, *args, **kwargs): - job_spec = self.arvados_job_spec(*args, **kwargs) + def run(self, **kwargs): + job_spec = self.arvados_job_spec(**kwargs) job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) -- 2.30.2