container_request["name"] = wfrecord["name"]
container_request["properties"]["template_uuid"] = wfuuid
+ self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+
try:
response = self.arvrunner.api.container_requests().create(
body=container_request
).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
- self.arvrunner.processes[self.uuid] = self
+ self.arvrunner.process_submitted(self)
if response["state"] == "Final":
logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
processStatus = "permanentFail"
finally:
self.output_callback(outputs, processStatus)
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
+ self.arvrunner.process_done(record["uuid"])
class RunnerContainer(Runner):
# --api=containers means use the containers API
# --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
# --disable-validate because we already validated so don't need to do it again
- command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps", "--disable-validate"]
+ # --eval-timeout is the timeout for javascript invocation
+ # --parallel-task-count is the number of threads to use for job submission
+ # --enable/disable-reuse sets desired job reuse
+ command = ["arvados-cwl-runner",
+ "--local",
+ "--api=containers",
+ "--no-log-timestamps",
+ "--disable-validate",
+ "--eval-timeout=%s" % self.arvrunner.eval_timeout,
+ "--thread-count=%s" % self.arvrunner.thread_count,
+ "--enable-reuse" if self.enable_reuse else "--disable-reuse"]
+
if self.output_name:
command.append("--output-name=" + self.output_name)
container_req["output_name"] = self.output_name
if kwargs.get("debug"):
command.append("--debug")
- if self.enable_reuse:
- command.append("--enable-reuse")
- else:
- command.append("--disable-reuse")
-
if self.on_error:
command.append("--on-error=" + self.on_error)
if self.arvrunner.project_uuid:
command.append("--project-uuid="+self.arvrunner.project_uuid)
- command.append("--eval-timeout=%s" % self.arvrunner.eval_timeout)
-
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
return container_req
- def run(self, *args, **kwargs):
+ def run(self, **kwargs):
kwargs["keepprefix"] = "keep:"
- job_spec = self.arvados_job_spec(*args, **kwargs)
+ job_spec = self.arvados_job_spec(**kwargs)
job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
response = self.arvrunner.api.container_requests().create(
).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
- self.arvrunner.processes[self.uuid] = self
+ self.arvrunner.process_submitted(self)
logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"])
else:
super(RunnerContainer, self).done(container)
finally:
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
+ self.arvrunner.process_done(record["uuid"])