X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6018e31ef8c2d344be0d596a1e8ca11d95bf0870..9c23603a2d852a563388c4b616fd7f1d01cbf5f1:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 39b9c2416a..afcf2db6a0 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -120,10 +120,12 @@ class ArvadosContainer(object): generatemapper = NoFollowPathMapper([self.generatefiles], "", "", separateDirs=False) - logger.debug("generatemapper is %s", generatemapper._pathmap) + sorteditems = sorted(generatemapper.items(), None, key=lambda n: n[1].target) + + logger.debug("generatemapper is %s", sorteditems) with Perf(metrics, "createfiles %s" % self.name): - for f, p in generatemapper.items(): + for f, p in sorteditems: if not p.target: pass elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"): @@ -155,8 +157,10 @@ class ArvadosContainer(object): with Perf(metrics, "generatefiles.save_new %s" % self.name): vwd.save_new() - for f, p in generatemapper.items(): - if not p.target or self.arvrunner.secret_store.has_secret(p.resolved): + prev = None + for f, p in sorteditems: + if (not p.target or self.arvrunner.secret_store.has_secret(p.resolved) or + (prev is not None and p.target.startswith(prev))): continue mountpoint = "%s/%s" % (self.outdir, p.target) mounts[mountpoint] = {"kind": "collection", @@ -164,6 +168,7 @@ class ArvadosContainer(object): "path": p.target} if p.type.startswith("Writable"): mounts[mountpoint]["writable"] = True + prev = p.target + "/" container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir} if self.environment: @@ -243,13 +248,15 @@ class ArvadosContainer(object): container_request["name"] = wfrecord["name"] container_request["properties"]["template_uuid"] = wfuuid + self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback) + try: response = self.arvrunner.api.container_requests().create( body=container_request ).execute(num_retries=self.arvrunner.num_retries) self.uuid = response["uuid"] - self.arvrunner.processes[self.uuid] = self + self.arvrunner.process_submitted(self) if response["state"] == "Final": logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"]) @@ -310,8 +317,7 @@ class ArvadosContainer(object): processStatus = "permanentFail" finally: self.output_callback(outputs, processStatus) - if record["uuid"] in self.arvrunner.processes: - del self.arvrunner.processes[record["uuid"]] + self.arvrunner.process_done(record["uuid"]) class RunnerContainer(Runner): @@ -390,7 +396,22 @@ class RunnerContainer(Runner): container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33] - command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"] + # --local means execute the workflow instead of submitting a container request + # --api=containers means use the containers API + # --no-log-timestamps means don't add timestamps (the logging infrastructure does this) + # --disable-validate because we already validated so don't need to do it again + # --eval-timeout is the timeout for javascript invocation + # --parallel-task-count is the number of threads to use for job submission + # --enable/disable-reuse sets desired job reuse + command = ["arvados-cwl-runner", + "--local", + "--api=containers", + "--no-log-timestamps", + "--disable-validate", + "--eval-timeout=%s" % self.arvrunner.eval_timeout, + "--thread-count=%s" % self.arvrunner.thread_count, + "--enable-reuse" if self.enable_reuse else "--disable-reuse"] + if self.output_name: command.append("--output-name=" + self.output_name) container_req["output_name"] = self.output_name @@ -401,11 +422,6 @@ class RunnerContainer(Runner): if kwargs.get("debug"): command.append("--debug") - if self.enable_reuse: - command.append("--enable-reuse") - else: - command.append("--disable-reuse") - if self.on_error: command.append("--on-error=" + self.on_error) @@ -418,8 +434,6 @@ class RunnerContainer(Runner): if self.arvrunner.project_uuid: command.append("--project-uuid="+self.arvrunner.project_uuid) - command.append("--eval-timeout=%s" % self.arvrunner.eval_timeout) - command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"]) container_req["command"] = command @@ -427,9 +441,9 @@ class RunnerContainer(Runner): return container_req - def run(self, *args, **kwargs): + def run(self, **kwargs): kwargs["keepprefix"] = "keep:" - job_spec = self.arvados_job_spec(*args, **kwargs) + job_spec = self.arvados_job_spec(**kwargs) job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid) response = self.arvrunner.api.container_requests().create( @@ -437,7 +451,7 @@ class RunnerContainer(Runner): ).execute(num_retries=self.arvrunner.num_retries) self.uuid = response["uuid"] - self.arvrunner.processes[self.uuid] = self + self.arvrunner.process_submitted(self) logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"]) @@ -455,5 +469,4 @@ class RunnerContainer(Runner): else: super(RunnerContainer, self).done(container) finally: - if record["uuid"] in self.arvrunner.processes: - del self.arvrunner.processes[record["uuid"]] + self.arvrunner.process_done(record["uuid"])