13773: Intercept logging calls to update runtime_status on runner containers.
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Fri, 7 Sep 2018 20:57:21 +0000 (17:57 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Fri, 7 Sep 2018 20:57:21 +0000 (17:57 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/tests/test_submit.py

index b73b768fb8112a638a886485a81a24f6e7031f11..e1bbcc2a786ae3522715d9d7289b7f29b7d09ddc 100644 (file)
@@ -38,7 +38,7 @@ import arvados.commands._util as arv_cmd
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-fromrunner import Runner, upload_docker, upload_job_order, upload_workflow_deps
+from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
@@ -46,6 +46,7 @@ from .perf import Perf
 from .pathmapper import NoFollowPathMapper
 from .task_queue import TaskQueue
 from .context import ArvLoadingContext, ArvRuntimeContext
+from .util import get_current_container
 from ._version import __version__
 
 from cwltool.pack import pack
@@ -65,6 +66,38 @@ arvados.log_handler.setFormatter(logging.Formatter(
 
 DEFAULT_PRIORITY = 500
 
+class RuntimeStatusLoggingHandler(logging.Handler):
+    """
+    Intercepts logging calls and report them as runtime statuses on runner
+    containers.
+    """
+    def __init__(self, runtime_status_update_func):
+        super(RuntimeStatusLoggingHandler, self).__init__()
+        self.runtime_status_update = runtime_status_update_func
+
+    def emit(self, record):
+        kind = None
+        if record.levelno == logging.ERROR:
+            kind = 'error'
+        elif record.levelno == logging.WARNING:
+            kind = 'warning'
+        elif record.levelno == logging.INFO:
+            kind = 'activity'
+        if kind is not None:
+            log_msg = record.getMessage()
+            if '\n' in log_msg:
+                # If the logged message is multi-line, include it as a detail
+                self.runtime_status_update(
+                    kind,
+                    "%s from %s (please see details)" % (kind, record.name),
+                    log_msg
+                )
+            else:
+                self.runtime_status_update(
+                    kind,
+                    "%s: %s" % (record.name, record.getMessage())
+                )
+
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
     containers API), wait for them to complete, and report output.
@@ -155,6 +188,12 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
         self.loadingContext.construct_tool_object = self.arv_make_tool
 
+        # Add a custom logging handler to the root logger for runtime status reporting
+        # if running inside a container
+        if get_current_container(self.api, self.num_retries, logger):
+            root_logger = logging.getLogger('')
+            handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
+            root_logger.addHandler(handler)
 
     def arv_make_tool(self, toolpath_object, loadingContext):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
@@ -203,12 +242,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         activity statuses.
         """
         with self.workflow_eval_lock:
-            try:
-                current = self.api.containers().current().execute(num_retries=self.num_retries)
-            except ApiError as e:
-                # Status code 404 just means we're not running in a container.
-                if e.resp.status != 404:
-                    logger.info("Getting current container: %s", e)
+            current = get_current_container(self.api, self.num_retries, logger)
+            if current is None:
                 return
             runtime_status = current.get('runtime_status', {})
             # In case of status being an error, only report the first one.
@@ -221,10 +256,14 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                 # Further errors are only mentioned as a count.
                 else:
                     # Get anything before an optional 'and N more' string.
-                    error_msg = re.match(
-                        r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
-                    more_failures = re.match(
-                        r'.*\(and (\d+) more\)', runtime_status.get('error'))
+                    try:
+                        error_msg = re.match(
+                            r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
+                        more_failures = re.match(
+                            r'.*\(and (\d+) more\)', runtime_status.get('error'))
+                    except TypeError:
+                        # Ignore tests stubbing errors
+                        return
                     if more_failures:
                         failure_qty = int(more_failures.groups()[0])
                         runtime_status.update({
@@ -438,12 +477,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
     def set_crunch_output(self):
         if self.work_api == "containers":
-            try:
-                current = self.api.containers().current().execute(num_retries=self.num_retries)
-            except ApiError as e:
-                # Status code 404 just means we're not running in a container.
-                if e.resp.status != 404:
-                    logger.info("Getting current container: %s", e)
+            current = get_current_container(self.api, self.num_retries, logger)
+            if current is None:
                 return
             try:
                 self.api.containers().update(uuid=current['uuid'],
index 8875b7d954d916e332175b5dbdd67103833719b1..f718a86b369f756be677d292f6b33c2d5261a975 100644 (file)
@@ -48,6 +48,7 @@ def stubs(func):
         stubs.keep_client = keep_client2
         stubs.keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
         stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
+        stubs.fake_container_uuid = "zzzzz-dz642-zzzzzzzzzzzzzzz"
 
         stubs.api = mock.MagicMock()
         stubs.api._rootDesc = get_rootDesc()
@@ -56,6 +57,9 @@ def stubs(func):
             "uuid": stubs.fake_user_uuid,
         }
         stubs.api.collections().list().execute.return_value = {"items": []}
+        stubs.api.containers().current().execute.return_value = {
+            "uuid": stubs.fake_container_uuid,
+        }
 
         class CollectionExecute(object):
             def __init__(self, exe):