X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bc1f8d47b8233b00a449edb38f4c4bde8f5d9163..400210f8c9d8b111a3efdaa76c8be579ea5666cb:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 47fbfa5a79..aa3388d00b 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -123,6 +123,8 @@ class ArvadosContainer(JobBase): "kind": "collection", "portable_data_hash": pdh } + if pdh in self.pathmapper.pdh_to_uuid: + mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh] if len(sp) == 2: if tp == "Directory": path = sp[1] @@ -140,7 +142,7 @@ class ArvadosContainer(JobBase): generatemapper = NoFollowPathMapper(self.generatefiles["listing"], "", "", separateDirs=False) - sorteditems = sorted(list(generatemapper.items()), key=lambda n: n[1].target) + sorteditems = sorted(generatemapper.items(), key=lambda n: n[1].target) logger.debug("generatemapper is %s", sorteditems) @@ -153,7 +155,7 @@ class ArvadosContainer(JobBase): vwd.mkdirs(p.target) else: source, path = self.arvrunner.fs_access.get_collection(p.resolved) - vwd.copy(path, p.target, source_collection=source) + vwd.copy(path or ".", p.target, source_collection=source) elif p.type == "CreateFile": if self.arvrunner.secret_store.has_secret(p.resolved): secret_mounts["%s/%s" % (self.outdir, p.target)] = { @@ -162,7 +164,7 @@ class ArvadosContainer(JobBase): } else: with vwd.open(p.target, "w") as n: - n.write(p.resolved.encode("utf-8")) + n.write(p.resolved) def keepemptydirs(p): if isinstance(p, arvados.collection.RichCollectionBase): @@ -223,6 +225,10 @@ class ArvadosContainer(JobBase): runtimeContext.pull_image, runtimeContext.project_uuid) + network_req, _ = self.get_requirement("NetworkAccess") + if network_req: + runtime_constraints["API"] = network_req["networkAccess"] + api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement") if api_req: runtime_constraints["API"] = True @@ -254,7 +260,7 @@ class ArvadosContainer(JobBase): if self.output_ttl < 0: raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"]) - if self.timelimit is not None: + if self.timelimit is not None and self.timelimit > 0: scheduling_parameters["max_run_time"] = self.timelimit extra_submit_params = {} @@ -270,6 +276,9 @@ class ArvadosContainer(JobBase): enable_reuse = runtimeContext.enable_reuse if enable_reuse: + reuse_req, _ = self.get_requirement("WorkReuse") + if reuse_req: + enable_reuse = reuse_req["enableReuse"] reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement") if reuse_req: enable_reuse = reuse_req["enableReuse"] @@ -304,8 +313,8 @@ class ArvadosContainer(JobBase): logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"]) else: logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"]) - except Exception as e: - logger.error("%s got error %s" % (self.arvrunner.label(self), str(e))) + except Exception: + logger.exception("%s got an error", self.arvrunner.label(self)) self.output_callback({}, "permanentFail") def done(self, record): @@ -329,8 +338,8 @@ class ArvadosContainer(JobBase): else: processStatus = "permanentFail" - if processStatus == "permanentFail": - logc = arvados.collection.CollectionReader(container["log"], + if processStatus == "permanentFail" and record["log_uuid"]: + logc = arvados.collection.CollectionReader(record["log_uuid"], api_client=self.arvrunner.api, keep_client=self.arvrunner.keep_client, num_retries=self.arvrunner.num_retries) @@ -342,7 +351,7 @@ class ArvadosContainer(JobBase): if record["output_uuid"]: if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl: # Compute the trash time to avoid requesting the collection record. - trash_at = ciso8601.parse_datetime_unaware(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl) + trash_at = ciso8601.parse_datetime_as_naive(record["modified_at"]) + datetime.timedelta(0, self.arvrunner.intermediate_output_ttl) aftertime = " at %s" % trash_at.strftime("%Y-%m-%d %H:%M:%S UTC") if self.arvrunner.intermediate_output_ttl else "" orpart = ", or" if self.arvrunner.trash_intermediate and self.arvrunner.intermediate_output_ttl else "" oncomplete = " upon successful completion of the workflow" if self.arvrunner.trash_intermediate else "" @@ -353,11 +362,13 @@ class ArvadosContainer(JobBase): if container["output"]: outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep") except WorkflowException as e: + # Only include a stack trace if in debug mode. + # A stack trace may obfuscate more useful output about the workflow. logger.error("%s unable to collect output from %s:\n%s", self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False)) processStatus = "permanentFail" - except Exception as e: - logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e) + except Exception: + logger.exception("%s while getting output object:", self.arvrunner.label(self)) processStatus = "permanentFail" finally: self.output_callback(outputs, processStatus) @@ -481,6 +492,9 @@ class RunnerContainer(Runner): if self.arvrunner.project_uuid: command.append("--project-uuid="+self.arvrunner.project_uuid) + if self.enable_dev: + command.append("--enable-dev") + command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"]) container_req["command"] = command @@ -499,6 +513,9 @@ class RunnerContainer(Runner): extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster if runtimeContext.submit_request_uuid: + if "cluster_id" in extra_submit_params: + # Doesn't make sense for "update" and actually fails + del extra_submit_params["cluster_id"] response = self.arvrunner.api.container_requests().update( uuid=runtimeContext.submit_request_uuid, body=job_spec, @@ -520,8 +537,9 @@ class RunnerContainer(Runner): container = self.arvrunner.api.containers().get( uuid=record["container_uuid"] ).execute(num_retries=self.arvrunner.num_retries) - except Exception as e: - logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e) + container["log"] = record["log_uuid"] + except Exception: + logger.exception("%s while getting runner container", self.arvrunner.label(self)) self.arvrunner.output_callback({}, "permanentFail") else: super(RunnerContainer, self).done(container)