Make sure it gets passed through to RunnerContainer.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>
set -e
if [[ -z "$WORKSPACE" ]] ; then
- echo "$helpmessage"
- echo
- echo "Must set WORKSPACE"
- exit 1
+ export WORKSPACE=$(readlink -f $(dirname $0)/..)
+ echo "Using WORKSPACE $WORKSPACE"
fi
if [[ -z "$ARVADOS_API_HOST" || -z "$ARVADOS_API_TOKEN" ]] ; then
def __init__(self, api_client, work_api=None, keep_client=None,
output_name=None, output_tags=None, num_retries=4,
- parallel_submit_count=4):
+ thread_count=4):
self.api = api_client
self.processes = {}
self.in_flight = 0
self.trash_intermediate = False
self.task_queue = Queue.Queue()
self.task_queue_threads = []
- self.parallel_submit_count = parallel_submit_count
+ self.thread_count = thread_count
self.poll_interval = 12
if keep_client is not None:
task()
def task_queue_add(self, task):
- if self.parallel_submit_count > 1:
+ if self.thread_count > 1:
self.task_queue.put(task)
else:
task()
logger.info("Pipeline instance %s", self.pipeline["uuid"])
if runnerjob and not kwargs.get("wait"):
- runnerjob.run(wait=kwargs.get("wait"))
+ runnerjob.run(**kwargs)
return (runnerjob.uuid, "success")
self.poll_api = arvados.api('v1')
self.polling_thread = threading.Thread(target=self.poll_states)
self.polling_thread.start()
- for r in xrange(0, self.parallel_submit_count):
+ for r in xrange(0, self.thread_count):
t = threading.Thread(target=self.task_queue_func)
self.task_queue_threads.append(t)
t.start()
action="store_true", default=False,
help=argparse.SUPPRESS)
- parser.add_argument("--parallel-submit-count", type=int,
- default=4, help="Submit requests in parallel (default 4)")
+ parser.add_argument("--thread-count", type=int,
+ default=4, help="Number of threads to use for job submit and output collection.")
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--trash-intermediate", action="store_true",
# --api=containers means use the containers API
# --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
# --disable-validate because we already validated so don't need to do it again
- command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps", "--disable-validate"]
+ # --eval-timeout is the timeout for javascript invocation
+ # --parallel-task-count is the number of threads to use for job submission
+ # --enable/disable-reuse sets desired job reuse
+ command = ["arvados-cwl-runner",
+ "--local",
+ "--api=containers",
+ "--no-log-timestamps",
+ "--disable-validate",
+ "--eval-timeout=%s" % self.arvrunner.eval_timeout,
+ "--thread-count=%s" % self.arvrunner.thread_count,
+ "--enable-reuse" if self.enable_reuse else "--disable-reuse"]
+
if self.output_name:
command.append("--output-name=" + self.output_name)
container_req["output_name"] = self.output_name
if kwargs.get("debug"):
command.append("--debug")
- if self.enable_reuse:
- command.append("--enable-reuse")
- else:
- command.append("--disable-reuse")
-
if self.on_error:
command.append("--on-error=" + self.on_error)
if self.arvrunner.project_uuid:
command.append("--project-uuid="+self.arvrunner.project_uuid)
- command.append("--eval-timeout=%s" % self.arvrunner.eval_timeout)
-
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
return container_req
- def run(self, *args, **kwargs):
+ def run(self, **kwargs):
kwargs["keepprefix"] = "keep:"
- job_spec = self.arvados_job_spec(*args, **kwargs)
+ job_spec = self.arvados_job_spec(**kwargs)
job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
response = self.arvrunner.api.container_requests().create(
}
}
- def run(self, *args, **kwargs):
- job_spec = self.arvados_job_spec(*args, **kwargs)
+ def run(self, **kwargs):
+ job_spec = self.arvados_job_spec(**kwargs)
job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)