X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9e76a12ff0b25322f86caf6d5ea70c09cbfd8829..cd6cc155469fb54cd7d868e5bc331f13805b79c9:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index aafbc38fc6..f048e505e8 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -27,6 +27,9 @@ from cwltool.job import JobBase import arvados.collection +import crunchstat_summary.summarizer +import crunchstat_summary.reader + from .arvdocker import arv_docker_get_image from . import done from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder @@ -367,11 +370,16 @@ class ArvadosContainer(JobBase): logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.", self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510") - ramMultiplier = [1] + ram_multiplier = [1] oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry") - if oom_retry_req and oom_retry_req.get('memoryRetryMultipler'): - ramMultiplier.append(oom_retry_req.get('memoryRetryMultipler')) + if oom_retry_req: + if oom_retry_req.get('memoryRetryMultiplier'): + ram_multiplier.append(oom_retry_req.get('memoryRetryMultiplier')) + elif oom_retry_req.get('memoryRetryMultipler'): + ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler')) + else: + ram_multiplier.append(2) if runtimeContext.runnerjob.startswith("arvwf:"): wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")] @@ -388,7 +396,7 @@ class ArvadosContainer(JobBase): self.uuid = runtimeContext.submit_request_uuid - for i in ramMultiplier: + for i in ram_multiplier: runtime_constraints["ram"] = ram * i if self.uuid: @@ -408,7 +416,7 @@ class ArvadosContainer(JobBase): break if response["container_uuid"] is None: - runtime_constraints["ram"] = ram * ramMultiplier[self.attempt_count] + runtime_constraints["ram"] = ram * ram_multiplier[self.attempt_count] container_request["state"] = "Committed" response = self.arvrunner.api.container_requests().update( @@ -459,6 +467,7 @@ class ArvadosContainer(JobBase): def done(self, record): outputs = {} retried = False + rcode = None try: container = self.arvrunner.api.containers().get( uuid=record["container_uuid"] @@ -477,7 +486,7 @@ class ArvadosContainer(JobBase): processStatus = "permanentFail" if processStatus == "permanentFail" and self.attempt_count == 1 and self.out_of_memory_retry(record, container): - logger.info("%s Container failed with out of memory error, retrying with more RAM.", + logger.warning("%s Container failed with out of memory error, retrying with more RAM.", self.arvrunner.label(self)) self.job_runtime.submit_request_uuid = None self.uuid = None @@ -486,20 +495,23 @@ class ArvadosContainer(JobBase): return if rcode == 137: - logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin'.", + logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin' or use the arv:OutOfMemoryRetry feature.", self.arvrunner.label(self)) else: processStatus = "permanentFail" - if processStatus == "permanentFail" and record["log_uuid"]: - logc = arvados.collection.CollectionReader(record["log_uuid"], - api_client=self.arvrunner.api, - keep_client=self.arvrunner.keep_client, - num_retries=self.arvrunner.num_retries) + logc = None + if record["log_uuid"]: + logc = arvados.collection.Collection(record["log_uuid"], + api_client=self.arvrunner.api, + keep_client=self.arvrunner.keep_client, + num_retries=self.arvrunner.num_retries) + + if processStatus == "permanentFail" and logc is not None: label = self.arvrunner.label(self) done.logtail( logc, logger.error, - "%s (%s) error log:" % (label, record["uuid"]), maxlen=40) + "%s (%s) error log:" % (label, record["uuid"]), maxlen=40, include_crunchrun=(rcode is None or rcode > 127)) if record["output_uuid"]: if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl: @@ -521,6 +533,28 @@ class ArvadosContainer(JobBase): uuid=self.uuid, body={"container_request": {"properties": properties}} ).execute(num_retries=self.arvrunner.num_retries) + + if logc is not None and self.job_runtime.enable_usage_report is not False: + try: + summarizer = crunchstat_summary.summarizer.ContainerRequestSummarizer( + record, + collection_object=logc, + label=self.name, + arv=self.arvrunner.api) + summarizer.run() + with logc.open("usage_report.html", "wt") as mr: + mr.write(summarizer.html_report()) + logc.save() + + # Post warnings about nodes that are under-utilized. + for rc in summarizer._recommend_gen(lambda x: x): + self.usage_report_notes.append(rc) + + except Exception as e: + logger.warning("%s unable to generate resource usage report", + self.arvrunner.label(self), + exc_info=(e if self.arvrunner.debug else False)) + except WorkflowException as e: # Only include a stack trace if in debug mode. # A stack trace may obfuscate more useful output about the workflow. @@ -559,13 +593,19 @@ class RunnerContainer(Runner): } self.job_order[param] = {"$include": mnt} + container_image = arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext) + + workflow_runner_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources") + if workflow_runner_req and workflow_runner_req.get("acrContainerImage"): + container_image = workflow_runner_req.get("acrContainerImage") + container_req = { "name": self.name, "output_path": "/var/spool/cwl", "cwd": "/var/spool/cwl", "priority": self.priority, "state": "Committed", - "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext), + "container_image": container_image, "mounts": { "/var/lib/cwl/cwl.input.json": { "kind": "json", @@ -586,7 +626,7 @@ class RunnerContainer(Runner): "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)), "API": True }, - "use_existing": False, # Never reuse the runner container - see #15497. + "use_existing": self.reuse_runner, "properties": {} } @@ -610,6 +650,8 @@ class RunnerContainer(Runner): "content": packed } container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] + elif self.embedded_tool.tool.get("id", "").startswith("file:"): + raise WorkflowException("Tool id '%s' is a local file but expected keep: or arvwf:" % self.embedded_tool.tool.get("id")) else: main = self.loadingContext.loader.idx["_:main"] if main.get("id") == "_:main": @@ -690,6 +732,12 @@ class RunnerContainer(Runner): if runtimeContext.prefer_cached_downloads: command.append("--prefer-cached-downloads") + if runtimeContext.enable_usage_report is True: + command.append("--enable-usage-report") + + if runtimeContext.enable_usage_report is False: + command.append("--disable-usage-report") + if self.fast_parser: command.append("--fast-parser") @@ -730,14 +778,9 @@ class RunnerContainer(Runner): logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"]) - workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"] workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"] - url = "" if workbench2: url = "{}processes/{}".format(workbench2, response["uuid"]) - elif workbench1: - url = "{}container_requests/{}".format(workbench1, response["uuid"]) - if url: logger.info("Monitor workflow progress at %s", url)