X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/36cfafd6e7eae2784c22aefdd9df26783412d42a..39fc2f223fae40dc4fb160758e76ca39304b44af:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index f3e122e603..4d0fde7440 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -15,9 +15,10 @@ import datetime import ciso8601 import uuid import math +import re import arvados_cwl.util -import ruamel.yaml as yaml +import ruamel.yaml from cwltool.errors import WorkflowException from cwltool.process import UnsupportedRequirement, shortname @@ -37,6 +38,9 @@ from ._version import __version__ logger = logging.getLogger('arvados.cwl-runner') metrics = logging.getLogger('arvados.cwl-runner.metrics') +def cleanup_name_for_collection(name): + return name.replace("/", " ") + class ArvadosContainer(JobBase): """Submit and manage a Crunch container request for executing a CWL CommandLineTool.""" @@ -53,6 +57,7 @@ class ArvadosContainer(JobBase): self.job_runtime = job_runtime self.running = False self.uuid = None + self.attempt_count = 0 def update_pipeline_component(self, r): pass @@ -85,9 +90,11 @@ class ArvadosContainer(JobBase): container_request["output_path"] = self.outdir container_request["cwd"] = self.outdir container_request["priority"] = runtimeContext.priority - container_request["state"] = "Committed" + container_request["state"] = "Uncommitted" container_request.setdefault("properties", {}) + container_request["properties"]["cwl_input"] = self.joborder + runtime_constraints = {} if runtimeContext.project_uuid: @@ -245,11 +252,7 @@ class ArvadosContainer(JobBase): container_request["container_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, runtimeContext.pull_image, - runtimeContext.project_uuid, - runtimeContext.force_docker_pull, - runtimeContext.tmp_outdir_prefix, - runtimeContext.match_local_docker, - runtimeContext.copy_deps) + runtimeContext) network_req, _ = self.get_requirement("NetworkAccess") if network_req: @@ -259,10 +262,22 @@ class ArvadosContainer(JobBase): if api_req: runtime_constraints["API"] = True + use_disk_cache = (self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheRAM", 0) == 0) + + keep_cache_type_req, _ = self.get_requirement("http://arvados.org/cwl#KeepCacheTypeRequirement") + if keep_cache_type_req: + if "keepCacheType" in keep_cache_type_req: + if keep_cache_type_req["keepCacheType"] == "ram_cache": + use_disk_cache = False + runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints") if runtime_req: if "keep_cache" in runtime_req: - runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20) + if use_disk_cache: + # If DefaultKeepCacheRAM is zero it means we should use disk cache. + runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20) + else: + runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20) if "outputDirType" in runtime_req: if runtime_req["outputDirType"] == "local_output_dir": # Currently the default behavior. @@ -320,7 +335,7 @@ class ArvadosContainer(JobBase): if runtimeContext.submit_runner_cluster: extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster - container_request["output_name"] = "Output from step %s" % (self.name) + container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name)) container_request["output_ttl"] = self.output_ttl container_request["mounts"] = mounts container_request["secret_mounts"] = secret_mounts @@ -362,20 +377,40 @@ class ArvadosContainer(JobBase): self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback) try: - if runtimeContext.submit_request_uuid: - response = self.arvrunner.api.container_requests().update( - uuid=runtimeContext.submit_request_uuid, - body=container_request, - **extra_submit_params - ).execute(num_retries=self.arvrunner.num_retries) - else: - response = self.arvrunner.api.container_requests().create( - body=container_request, - **extra_submit_params - ).execute(num_retries=self.arvrunner.num_retries) + ram = runtime_constraints["ram"] + + for i in range(1, 4): + runtime_constraints["ram"] = ram * i + + if runtimeContext.submit_request_uuid: + response = self.arvrunner.api.container_requests().update( + uuid=runtimeContext.submit_request_uuid, + body=container_request, + **extra_submit_params + ).execute(num_retries=self.arvrunner.num_retries) + else: + response = self.arvrunner.api.container_requests().create( + body=container_request, + **extra_submit_params + ).execute(num_retries=self.arvrunner.num_retries) + runtimeContext.submit_request_uuid = response["uuid"] + + if response["container_uuid"] is not None: + break + + if response["container_uuid"] is None: + runtime_constraints["ram"] = ram * (self.attempt_count+1) + + container_request["state"] = "Committed" + response = self.arvrunner.api.container_requests().update( + uuid=runtimeContext.submit_request_uuid, + body=container_request, + **extra_submit_params + ).execute(num_retries=self.arvrunner.num_retries) self.uuid = response["uuid"] self.arvrunner.process_submitted(self) + self.attempt_count += 1 if response["state"] == "Final": logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"]) @@ -386,8 +421,35 @@ class ArvadosContainer(JobBase): logger.debug("Container request was %s", container_request) self.output_callback({}, "permanentFail") + def out_of_memory_retry(self, record, container): + if container["exit_code"] == 137: + return True + + logc = arvados.collection.CollectionReader(record["log_uuid"], + api_client=self.arvrunner.api, + keep_client=self.arvrunner.keep_client, + num_retries=self.arvrunner.num_retries) + + loglines = [""] + def callback(v1, v2, v3): + loglines[0] = v3 + + done.logtail(logc, callback, "", maxlen=200) + + oom_matches = r'(bad_alloc|out ?of ?memory|Container using over 95% of memory)' + + print("Checking loglines", loglines[0]) + + print("Match", re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE)) + + if re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE): + return True + + return False + def done(self, record): outputs = {} + retried = False try: container = self.arvrunner.api.containers().get( uuid=record["container_uuid"] @@ -405,6 +467,14 @@ class ArvadosContainer(JobBase): else: processStatus = "permanentFail" + if processStatus == "permanentFail" and self.out_of_memory_retry(record, container): + logger.info("%s Container failed with out of memory error, retrying with more RAM.", + self.arvrunner.label(self)) + self.job_runtime.submit_request_uuid = None + self.run(None) + retried = True + return + if rcode == 137: logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin'.", self.arvrunner.label(self)) @@ -434,6 +504,13 @@ class ArvadosContainer(JobBase): if container["output"]: outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep") + + properties = record["properties"].copy() + properties["cwl_output"] = outputs + self.arvrunner.api.container_requests().update( + uuid=self.uuid, + body={"container_request": {"properties": properties}} + ).execute(num_retries=self.arvrunner.num_retries) except WorkflowException as e: # Only include a stack trace if in debug mode. # A stack trace may obfuscate more useful output about the workflow. @@ -444,13 +521,14 @@ class ArvadosContainer(JobBase): logger.exception("%s while getting output object:", self.arvrunner.label(self)) processStatus = "permanentFail" finally: - self.output_callback(outputs, processStatus) + if not retried: + self.output_callback(outputs, processStatus) class RunnerContainer(Runner): """Submit and manage a container that runs arvados-cwl-runner.""" - def arvados_job_spec(self, runtimeContext): + def arvados_job_spec(self, runtimeContext, git_info): """Create an Arvados container request for this workflow. The returned dict can be used to create a container passed as @@ -511,15 +589,28 @@ class RunnerContainer(Runner): "kind": "collection", "portable_data_hash": "%s" % workflowcollection } + elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"): + uuid, frg = urllib.parse.urldefrag(self.embedded_tool.tool["id"]) + workflowpath = "/var/lib/cwl/workflow.json#" + frg + packedtxt = self.loadingContext.loader.fetch_text(uuid) + yaml = ruamel.yaml.YAML(typ='safe', pure=True) + packed = yaml.load(packedtxt) + container_req["mounts"]["/var/lib/cwl/workflow.json"] = { + "kind": "json", + "content": packed + } + container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] else: - packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext) + main = self.loadingContext.loader.idx["_:main"] + if main.get("id") == "_:main": + del main["id"] workflowpath = "/var/lib/cwl/workflow.json#main" container_req["mounts"]["/var/lib/cwl/workflow.json"] = { "kind": "json", - "content": packed + "content": main } - if self.embedded_tool.tool.get("id", "").startswith("arvwf:"): - container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] + + container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()}) properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties") if properties_req: @@ -583,6 +674,15 @@ class RunnerContainer(Runner): if runtimeContext.enable_preemptible is False: command.append("--disable-preemptible") + if runtimeContext.varying_url_params: + command.append("--varying-url-params="+runtimeContext.varying_url_params) + + if runtimeContext.prefer_cached_downloads: + command.append("--prefer-cached-downloads") + + if self.fast_parser: + command.append("--fast-parser") + command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"]) container_req["command"] = command @@ -592,7 +692,7 @@ class RunnerContainer(Runner): def run(self, runtimeContext): runtimeContext.keepprefix = "keep:" - job_spec = self.arvados_job_spec(runtimeContext) + job_spec = self.arvados_job_spec(runtimeContext, self.git_info) if runtimeContext.project_uuid: job_spec["owner_uuid"] = runtimeContext.project_uuid