Merge branch '21535-multi-wf-delete'
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
index ee0b10d1409fe9e68203e131b87fd7030d827672..34b79d67b47aaf0a51ed70a3fd99e2d16457abde 100644 (file)
@@ -2,10 +2,6 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
-from future import standard_library
-standard_library.install_aliases()
-from builtins import str
-
 import logging
 import json
 import os
@@ -15,6 +11,7 @@ import datetime
 import ciso8601
 import uuid
 import math
+import re
 
 import arvados_cwl.util
 import ruamel.yaml
@@ -26,6 +23,9 @@ from cwltool.job import JobBase
 
 import arvados.collection
 
+import crunchstat_summary.summarizer
+import crunchstat_summary.reader
+
 from .arvdocker import arv_docker_get_image
 from . import done
 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
@@ -56,6 +56,7 @@ class ArvadosContainer(JobBase):
         self.job_runtime = job_runtime
         self.running = False
         self.uuid = None
+        self.attempt_count = 0
 
     def update_pipeline_component(self, r):
         pass
@@ -88,7 +89,7 @@ class ArvadosContainer(JobBase):
         container_request["output_path"] = self.outdir
         container_request["cwd"] = self.outdir
         container_request["priority"] = runtimeContext.priority
-        container_request["state"] = "Committed"
+        container_request["state"] = "Uncommitted"
         container_request.setdefault("properties", {})
 
         container_request["properties"]["cwl_input"] = self.joborder
@@ -365,6 +366,17 @@ class ArvadosContainer(JobBase):
                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
 
+        ram_multiplier = [1]
+
+        oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
+        if oom_retry_req:
+            if oom_retry_req.get('memoryRetryMultiplier'):
+                ram_multiplier.append(oom_retry_req.get('memoryRetryMultiplier'))
+            elif oom_retry_req.get('memoryRetryMultipler'):
+                ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler'))
+            else:
+                ram_multiplier.append(2)
+
         if runtimeContext.runnerjob.startswith("arvwf:"):
             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
@@ -372,23 +384,45 @@ class ArvadosContainer(JobBase):
                 container_request["name"] = wfrecord["name"]
             container_request["properties"]["template_uuid"] = wfuuid
 
-        self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+        if self.attempt_count == 0:
+            self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
 
         try:
-            if runtimeContext.submit_request_uuid:
-                response = self.arvrunner.api.container_requests().update(
-                    uuid=runtimeContext.submit_request_uuid,
-                    body=container_request,
-                    **extra_submit_params
-                ).execute(num_retries=self.arvrunner.num_retries)
-            else:
-                response = self.arvrunner.api.container_requests().create(
-                    body=container_request,
-                    **extra_submit_params
-                ).execute(num_retries=self.arvrunner.num_retries)
+            ram = runtime_constraints["ram"]
+
+            self.uuid = runtimeContext.submit_request_uuid
+
+            for i in ram_multiplier:
+                runtime_constraints["ram"] = ram * i
+
+                if self.uuid:
+                    response = self.arvrunner.api.container_requests().update(
+                        uuid=self.uuid,
+                        body=container_request,
+                        **extra_submit_params
+                    ).execute(num_retries=self.arvrunner.num_retries)
+                else:
+                    response = self.arvrunner.api.container_requests().create(
+                        body=container_request,
+                        **extra_submit_params
+                    ).execute(num_retries=self.arvrunner.num_retries)
+                    self.uuid = response["uuid"]
+
+                if response["container_uuid"] is not None:
+                    break
+
+            if response["container_uuid"] is None:
+                runtime_constraints["ram"] = ram * ram_multiplier[self.attempt_count]
+
+            container_request["state"] = "Committed"
+            response = self.arvrunner.api.container_requests().update(
+                uuid=self.uuid,
+                body=container_request,
+                **extra_submit_params
+            ).execute(num_retries=self.arvrunner.num_retries)
 
-            self.uuid = response["uuid"]
             self.arvrunner.process_submitted(self)
+            self.attempt_count += 1
 
             if response["state"] == "Final":
                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
@@ -399,8 +433,37 @@ class ArvadosContainer(JobBase):
             logger.debug("Container request was %s", container_request)
             self.output_callback({}, "permanentFail")
 
+    def out_of_memory_retry(self, record, container):
+        oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
+        if oom_retry_req is None:
+            return False
+
+        # Sometimes it gets killed with no warning
+        if container["exit_code"] == 137:
+            return True
+
+        logc = arvados.collection.CollectionReader(record["log_uuid"],
+                                                   api_client=self.arvrunner.api,
+                                                   keep_client=self.arvrunner.keep_client,
+                                                   num_retries=self.arvrunner.num_retries)
+
+        loglines = [""]
+        def callback(v1, v2, v3):
+            loglines[0] = v3
+
+        done.logtail(logc, callback, "", maxlen=1000)
+
+        # Check allocation failure
+        oom_matches = oom_retry_req.get('memoryErrorRegex') or r'(bad_alloc|out ?of ?memory|memory ?error|container using over 9.% of memory)'
+        if re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE):
+            return True
+
+        return False
+
     def done(self, record):
         outputs = {}
+        retried = False
+        rcode = None
         try:
             container = self.arvrunner.api.containers().get(
                 uuid=record["container_uuid"]
@@ -418,21 +481,33 @@ class ArvadosContainer(JobBase):
                 else:
                     processStatus = "permanentFail"
 
+                if processStatus == "permanentFail" and self.attempt_count == 1 and self.out_of_memory_retry(record, container):
+                    logger.warning("%s Container failed with out of memory error, retrying with more RAM.",
+                                 self.arvrunner.label(self))
+                    self.job_runtime.submit_request_uuid = None
+                    self.uuid = None
+                    self.run(None)
+                    retried = True
+                    return
+
                 if rcode == 137:
-                    logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
+                    logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin' or use the arv:OutOfMemoryRetry feature.",
                                  self.arvrunner.label(self))
             else:
                 processStatus = "permanentFail"
 
-            if processStatus == "permanentFail" and record["log_uuid"]:
-                logc = arvados.collection.CollectionReader(record["log_uuid"],
-                                                           api_client=self.arvrunner.api,
-                                                           keep_client=self.arvrunner.keep_client,
-                                                           num_retries=self.arvrunner.num_retries)
+            logc = None
+            if record["log_uuid"]:
+                logc = arvados.collection.Collection(record["log_uuid"],
+                                                     api_client=self.arvrunner.api,
+                                                     keep_client=self.arvrunner.keep_client,
+                                                     num_retries=self.arvrunner.num_retries)
+
+            if processStatus == "permanentFail" and logc is not None:
                 label = self.arvrunner.label(self)
                 done.logtail(
                     logc, logger.error,
-                    "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
+                    "%s (%s) error log:" % (label, record["uuid"]), maxlen=40, include_crunchrun=(rcode is None or rcode > 127))
 
             if record["output_uuid"]:
                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
@@ -454,6 +529,28 @@ class ArvadosContainer(JobBase):
                 uuid=self.uuid,
                 body={"container_request": {"properties": properties}}
             ).execute(num_retries=self.arvrunner.num_retries)
+
+            if logc is not None and self.job_runtime.enable_usage_report is not False:
+                try:
+                    summarizer = crunchstat_summary.summarizer.ContainerRequestSummarizer(
+                        record,
+                        collection_object=logc,
+                        label=self.name,
+                        arv=self.arvrunner.api)
+                    summarizer.run()
+                    with logc.open("usage_report.html", "wt") as mr:
+                        mr.write(summarizer.html_report())
+                    logc.save()
+
+                    # Post warnings about nodes that are under-utilized.
+                    for rc in summarizer._recommend_gen(lambda x: x):
+                        self.job_runtime.usage_report_notes.append(rc)
+
+                except Exception as e:
+                    logger.warning("%s unable to generate resource usage report",
+                                 self.arvrunner.label(self),
+                                 exc_info=(e if self.arvrunner.debug else False))
+
         except WorkflowException as e:
             # Only include a stack trace if in debug mode.
             # A stack trace may obfuscate more useful output about the workflow.
@@ -464,7 +561,8 @@ class ArvadosContainer(JobBase):
             logger.exception("%s while getting output object:", self.arvrunner.label(self))
             processStatus = "permanentFail"
         finally:
-            self.output_callback(outputs, processStatus)
+            if not retried:
+                self.output_callback(outputs, processStatus)
 
 
 class RunnerContainer(Runner):
@@ -491,13 +589,19 @@ class RunnerContainer(Runner):
                 }
                 self.job_order[param] = {"$include": mnt}
 
+        container_image = arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext)
+
+        workflow_runner_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
+        if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
+            container_image = workflow_runner_req.get("acrContainerImage")
+
         container_req = {
             "name": self.name,
             "output_path": "/var/spool/cwl",
             "cwd": "/var/spool/cwl",
             "priority": self.priority,
             "state": "Committed",
-            "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
+            "container_image": container_image,
             "mounts": {
                 "/var/lib/cwl/cwl.input.json": {
                     "kind": "json",
@@ -518,7 +622,7 @@ class RunnerContainer(Runner):
                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
                 "API": True
             },
-            "use_existing": False, # Never reuse the runner container - see #15497.
+            "use_existing": self.reuse_runner,
             "properties": {}
         }
 
@@ -542,8 +646,12 @@ class RunnerContainer(Runner):
                 "content": packed
             }
             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
+        elif self.embedded_tool.tool.get("id", "").startswith("file:"):
+            raise WorkflowException("Tool id '%s' is a local file but expected keep: or arvwf:" % self.embedded_tool.tool.get("id"))
         else:
             main = self.loadingContext.loader.idx["_:main"]
+            if main.get("id") == "_:main":
+                del main["id"]
             workflowpath = "/var/lib/cwl/workflow.json#main"
             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
                 "kind": "json",
@@ -620,6 +728,12 @@ class RunnerContainer(Runner):
         if runtimeContext.prefer_cached_downloads:
             command.append("--prefer-cached-downloads")
 
+        if runtimeContext.enable_usage_report is True:
+            command.append("--enable-usage-report")
+
+        if runtimeContext.enable_usage_report is False:
+            command.append("--disable-usage-report")
+
         if self.fast_parser:
             command.append("--fast-parser")
 
@@ -660,14 +774,9 @@ class RunnerContainer(Runner):
 
         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
 
-        workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
-        url = ""
         if workbench2:
             url = "{}processes/{}".format(workbench2, response["uuid"])
-        elif workbench1:
-            url = "{}container_requests/{}".format(workbench1, response["uuid"])
-        if url:
             logger.info("Monitor workflow progress at %s", url)