X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2de2c96925cc3439305f16dced7f89bd9124853d..refs/heads/20455-noopener:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 742906c616..584ca1713a 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -15,6 +15,7 @@ import datetime import ciso8601 import uuid import math +import re import arvados_cwl.util import ruamel.yaml @@ -56,6 +57,7 @@ class ArvadosContainer(JobBase): self.job_runtime = job_runtime self.running = False self.uuid = None + self.attempt_count = 0 def update_pipeline_component(self, r): pass @@ -88,7 +90,7 @@ class ArvadosContainer(JobBase): container_request["output_path"] = self.outdir container_request["cwd"] = self.outdir container_request["priority"] = runtimeContext.priority - container_request["state"] = "Committed" + container_request["state"] = "Uncommitted" container_request.setdefault("properties", {}) container_request["properties"]["cwl_input"] = self.joborder @@ -365,6 +367,17 @@ class ArvadosContainer(JobBase): logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.", self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510") + ram_multiplier = [1] + + oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry") + if oom_retry_req: + if oom_retry_req.get('memoryRetryMultiplier'): + ram_multiplier.append(oom_retry_req.get('memoryRetryMultiplier')) + elif oom_retry_req.get('memoryRetryMultipler'): + ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler')) + else: + ram_multiplier.append(2) + if runtimeContext.runnerjob.startswith("arvwf:"): wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")] wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries) @@ -372,23 +385,45 @@ class ArvadosContainer(JobBase): container_request["name"] = wfrecord["name"] container_request["properties"]["template_uuid"] = wfuuid - self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback) + if self.attempt_count == 0: + self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback) try: - if runtimeContext.submit_request_uuid: - response = self.arvrunner.api.container_requests().update( - uuid=runtimeContext.submit_request_uuid, - body=container_request, - **extra_submit_params - ).execute(num_retries=self.arvrunner.num_retries) - else: - response = self.arvrunner.api.container_requests().create( - body=container_request, - **extra_submit_params - ).execute(num_retries=self.arvrunner.num_retries) + ram = runtime_constraints["ram"] + + self.uuid = runtimeContext.submit_request_uuid + + for i in ram_multiplier: + runtime_constraints["ram"] = ram * i + + if self.uuid: + response = self.arvrunner.api.container_requests().update( + uuid=self.uuid, + body=container_request, + **extra_submit_params + ).execute(num_retries=self.arvrunner.num_retries) + else: + response = self.arvrunner.api.container_requests().create( + body=container_request, + **extra_submit_params + ).execute(num_retries=self.arvrunner.num_retries) + self.uuid = response["uuid"] + + if response["container_uuid"] is not None: + break + + if response["container_uuid"] is None: + runtime_constraints["ram"] = ram * ram_multiplier[self.attempt_count] + + container_request["state"] = "Committed" + response = self.arvrunner.api.container_requests().update( + uuid=self.uuid, + body=container_request, + **extra_submit_params + ).execute(num_retries=self.arvrunner.num_retries) - self.uuid = response["uuid"] self.arvrunner.process_submitted(self) + self.attempt_count += 1 if response["state"] == "Final": logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"]) @@ -399,8 +434,37 @@ class ArvadosContainer(JobBase): logger.debug("Container request was %s", container_request) self.output_callback({}, "permanentFail") + def out_of_memory_retry(self, record, container): + oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry") + if oom_retry_req is None: + return False + + # Sometimes it gets killed with no warning + if container["exit_code"] == 137: + return True + + logc = arvados.collection.CollectionReader(record["log_uuid"], + api_client=self.arvrunner.api, + keep_client=self.arvrunner.keep_client, + num_retries=self.arvrunner.num_retries) + + loglines = [""] + def callback(v1, v2, v3): + loglines[0] = v3 + + done.logtail(logc, callback, "", maxlen=1000) + + # Check allocation failure + oom_matches = oom_retry_req.get('memoryErrorRegex') or r'(bad_alloc|out ?of ?memory|memory ?error|container using over 9.% of memory)' + if re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE): + return True + + return False + def done(self, record): outputs = {} + retried = False + rcode = None try: container = self.arvrunner.api.containers().get( uuid=record["container_uuid"] @@ -418,8 +482,17 @@ class ArvadosContainer(JobBase): else: processStatus = "permanentFail" + if processStatus == "permanentFail" and self.attempt_count == 1 and self.out_of_memory_retry(record, container): + logger.warning("%s Container failed with out of memory error, retrying with more RAM.", + self.arvrunner.label(self)) + self.job_runtime.submit_request_uuid = None + self.uuid = None + self.run(None) + retried = True + return + if rcode == 137: - logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin'.", + logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin' or use the arv:OutOfMemoryRetry feature.", self.arvrunner.label(self)) else: processStatus = "permanentFail" @@ -432,7 +505,7 @@ class ArvadosContainer(JobBase): label = self.arvrunner.label(self) done.logtail( logc, logger.error, - "%s (%s) error log:" % (label, record["uuid"]), maxlen=40) + "%s (%s) error log:" % (label, record["uuid"]), maxlen=40, include_crunchrun=(rcode is None or rcode > 127)) if record["output_uuid"]: if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl: @@ -464,7 +537,8 @@ class ArvadosContainer(JobBase): logger.exception("%s while getting output object:", self.arvrunner.label(self)) processStatus = "permanentFail" finally: - self.output_callback(outputs, processStatus) + if not retried: + self.output_callback(outputs, processStatus) class RunnerContainer(Runner): @@ -491,13 +565,19 @@ class RunnerContainer(Runner): } self.job_order[param] = {"$include": mnt} + container_image = arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext) + + workflow_runner_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources") + if workflow_runner_req and workflow_runner_req.get("acrContainerImage"): + container_image = workflow_runner_req.get("acrContainerImage") + container_req = { "name": self.name, "output_path": "/var/spool/cwl", "cwd": "/var/spool/cwl", "priority": self.priority, "state": "Committed", - "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext), + "container_image": container_image, "mounts": { "/var/lib/cwl/cwl.input.json": { "kind": "json", @@ -518,7 +598,7 @@ class RunnerContainer(Runner): "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)), "API": True }, - "use_existing": False, # Never reuse the runner container - see #15497. + "use_existing": self.reuse_runner, "properties": {} } @@ -542,6 +622,8 @@ class RunnerContainer(Runner): "content": packed } container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] + elif self.embedded_tool.tool.get("id", "").startswith("file:"): + raise WorkflowException("Tool id '%s' is a local file but expected keep: or arvwf:" % self.embedded_tool.tool.get("id")) else: main = self.loadingContext.loader.idx["_:main"] if main.get("id") == "_:main": @@ -662,14 +744,9 @@ class RunnerContainer(Runner): logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"]) - workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"] workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"] - url = "" if workbench2: url = "{}processes/{}".format(workbench2, response["uuid"]) - elif workbench1: - url = "{}container_requests/{}".format(workbench1, response["uuid"]) - if url: logger.info("Monitor workflow progress at %s", url)