generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
separateDirs=False)
- logger.debug("generatemapper is %s", generatemapper._pathmap)
+ sorteditems = sorted(generatemapper.items(), None, key=lambda n: n[1].target)
+
+ logger.debug("generatemapper is %s", sorteditems)
with Perf(metrics, "createfiles %s" % self.name):
- for f, p in generatemapper.items():
+ for f, p in sorteditems:
if not p.target:
pass
elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
with Perf(metrics, "generatefiles.save_new %s" % self.name):
vwd.save_new()
- for f, p in generatemapper.items():
- if not p.target or self.arvrunner.secret_store.has_secret(p.resolved):
+ prev = None
+ for f, p in sorteditems:
+ if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or
+ (prev is not None and p.target.startswith(prev))):
continue
mountpoint = "%s/%s" % (self.outdir, p.target)
mounts[mountpoint] = {"kind": "collection",
"path": p.target}
if p.type.startswith("Writable"):
mounts[mountpoint]["writable"] = True
+ prev = p.target + "/"
container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
if self.environment:
container_request["name"] = wfrecord["name"]
container_request["properties"]["template_uuid"] = wfuuid
+ self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+
try:
response = self.arvrunner.api.container_requests().create(
body=container_request
).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
- self.arvrunner.processes[self.uuid] = self
+ self.arvrunner.process_submitted(self)
if response["state"] == "Final":
logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
processStatus = "permanentFail"
finally:
self.output_callback(outputs, processStatus)
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
+ self.arvrunner.process_done(record["uuid"])
class RunnerContainer(Runner):
container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
- command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
+ # --local means execute the workflow instead of submitting a container request
+ # --api=containers means use the containers API
+ # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
+ # --disable-validate because we already validated so don't need to do it again
+ # --eval-timeout is the timeout for javascript invocation
+ # --parallel-task-count is the number of threads to use for job submission
+ # --enable/disable-reuse sets desired job reuse
+ command = ["arvados-cwl-runner",
+ "--local",
+ "--api=containers",
+ "--no-log-timestamps",
+ "--disable-validate",
+ "--eval-timeout=%s" % self.arvrunner.eval_timeout,
+ "--thread-count=%s" % self.arvrunner.thread_count,
+ "--enable-reuse" if self.enable_reuse else "--disable-reuse"]
+
if self.output_name:
command.append("--output-name=" + self.output_name)
container_req["output_name"] = self.output_name
if kwargs.get("debug"):
command.append("--debug")
- if self.enable_reuse:
- command.append("--enable-reuse")
- else:
- command.append("--disable-reuse")
-
if self.on_error:
command.append("--on-error=" + self.on_error)
if self.arvrunner.project_uuid:
command.append("--project-uuid="+self.arvrunner.project_uuid)
- command.append("--eval-timeout=%s" % self.arvrunner.eval_timeout)
-
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
return container_req
- def run(self, *args, **kwargs):
+ def run(self, **kwargs):
kwargs["keepprefix"] = "keep:"
- job_spec = self.arvados_job_spec(*args, **kwargs)
+ job_spec = self.arvados_job_spec(**kwargs)
job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
response = self.arvrunner.api.container_requests().create(
).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
- self.arvrunner.processes[self.uuid] = self
+ self.arvrunner.process_submitted(self)
logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"])
else:
super(RunnerContainer, self).done(container)
finally:
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
+ self.arvrunner.process_done(record["uuid"])