13773: Removed superfluous documentation
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 131795ee2c0173703a7385c8676d11536ff17398..5ea402a666742baa7ee7c8a870f30cac34685a6c 100644 (file)
@@ -38,7 +38,7 @@ import arvados.commands._util as arv_cmd
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-fromrunner import Runner, upload_docker, upload_job_order, upload_workflow_deps
+from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
@@ -46,6 +46,7 @@ from .perf import Perf
 from .pathmapper import NoFollowPathMapper
 from .task_queue import TaskQueue
 from .context import ArvLoadingContext, ArvRuntimeContext
+from .util import get_current_container
 from ._version import __version__
 
 from cwltool.pack import pack
@@ -65,6 +66,38 @@ arvados.log_handler.setFormatter(logging.Formatter(
 
 DEFAULT_PRIORITY = 500
 
+class RuntimeStatusLoggingHandler(logging.Handler):
+    """
+    Intercepts logging calls and report them as runtime statuses on runner
+    containers.
+    """
+    def __init__(self, runtime_status_update_func):
+        super(RuntimeStatusLoggingHandler, self).__init__()
+        self.runtime_status_update = runtime_status_update_func
+
+    def emit(self, record):
+        kind = None
+        if record.levelno == logging.ERROR:
+            kind = 'error'
+        elif record.levelno == logging.WARNING:
+            kind = 'warning'
+        if kind is not None:
+            log_msg = record.getMessage()
+            if '\n' in log_msg:
+                # If the logged message is multi-line, use its first line as status
+                # and the rest as detail.
+                status, detail = log_msg.split('\n', 1)
+                self.runtime_status_update(
+                    kind,
+                    "%s: %s" % (record.name, status),
+                    detail
+                )
+            else:
+                self.runtime_status_update(
+                    kind,
+                    "%s: %s" % (record.name, record.getMessage())
+                )
+
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
     containers API), wait for them to complete, and report output.
@@ -133,13 +166,34 @@ class ArvCwlRunner(object):
             if arvargs.work_api is None:
                 raise Exception("No supported APIs")
             else:
-                raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
+                raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
+
+        if self.work_api == "jobs":
+            logger.warn("""
+*******************************
+Using the deprecated 'jobs' API.
+
+To get rid of this warning:
+
+Users: read about migrating at
+http://doc.arvados.org/user/cwl/cwl-style.html#migrate
+and use the option --api=containers
+
+Admins: configure the cluster to disable the 'jobs' API as described at:
+http://doc.arvados.org/install/install-api-server.html#disable_api_methods
+*******************************""")
 
         self.loadingContext = ArvLoadingContext(vars(arvargs))
         self.loadingContext.fetcher_constructor = self.fetcher_constructor
         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
         self.loadingContext.construct_tool_object = self.arv_make_tool
 
+        # Add a custom logging handler to the root logger for runtime status reporting
+        # if running inside a container
+        if get_current_container(self.api, self.num_retries, logger):
+            root_logger = logging.getLogger('')
+            handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
+            root_logger.addHandler(handler)
 
     def arv_make_tool(self, toolpath_object, loadingContext):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
@@ -153,14 +207,13 @@ class ArvCwlRunner(object):
         with self.workflow_eval_lock:
             if processStatus == "success":
                 logger.info("Overall process status is %s", processStatus)
-                if self.pipeline:
-                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                         body={"state": "Complete"}).execute(num_retries=self.num_retries)
+                state = "Complete"
             else:
                 logger.error("Overall process status is %s", processStatus)
-                if self.pipeline:
-                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                         body={"state": "Failed"}).execute(num_retries=self.num_retries)
+                state = "Failed"
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                        body={"state": state}).execute(num_retries=self.num_retries)
             self.final_status = processStatus
             self.final_output = out
             self.workflow_eval_lock.notifyAll()
@@ -180,6 +233,70 @@ class ArvCwlRunner(object):
             self.task_queue.add(partial(j.done, record))
             del self.processes[uuid]
 
+    def runtime_status_update(self, kind, message, detail=None):
+        """
+        Updates the runtime_status field on the runner container.
+        Called from a failing child container: records the first child error
+        or updates the error count on subsequent error statuses.
+        Also called from other parts that need to report errros, warnings or just
+        activity statuses.
+        """
+        with self.workflow_eval_lock:
+            current = get_current_container(self.api, self.num_retries, logger)
+            if current is None:
+                return
+            runtime_status = current.get('runtime_status', {})
+            # In case of status being an error, only report the first one.
+            if kind == 'error':
+                if not runtime_status.get('error'):
+                    runtime_status.update({
+                        'error': message
+                    })
+                    if detail is not None:
+                        runtime_status.update({
+                            'errorDetail': detail
+                        })
+                # Further errors are only mentioned as a count.
+                else:
+                    # Get anything before an optional 'and N more' string.
+                    try:
+                        error_msg = re.match(
+                            r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
+                        more_failures = re.match(
+                            r'.*\(and (\d+) more\)', runtime_status.get('error'))
+                    except TypeError:
+                        # Ignore tests stubbing errors
+                        return
+                    if more_failures:
+                        failure_qty = int(more_failures.groups()[0])
+                        runtime_status.update({
+                            'error': "%s (and %d more)" % (error_msg, failure_qty+1)
+                        })
+                    else:
+                        runtime_status.update({
+                            'error': "%s (and 1 more)" % error_msg
+                        })
+            elif kind in ['warning', 'activity']:
+                # Record the last warning/activity status without regard of
+                # previous occurences.
+                runtime_status.update({
+                    kind: message
+                })
+                if detail is not None:
+                    runtime_status.update({
+                        kind+"Detail": detail
+                    })
+            else:
+                # Ignore any other status kind
+                return
+            try:
+                self.api.containers().update(uuid=current['uuid'],
+                                            body={
+                                                'runtime_status': runtime_status,
+                                            }).execute(num_retries=self.num_retries)
+            except Exception as e:
+                logger.info("Couldn't update runtime_status: %s", e)
+
     def wrapped_callback(self, cb, obj, st):
         with self.workflow_eval_lock:
             cb(obj, st)
@@ -363,12 +480,8 @@ class ArvCwlRunner(object):
 
     def set_crunch_output(self):
         if self.work_api == "containers":
-            try:
-                current = self.api.containers().current().execute(num_retries=self.num_retries)
-            except ApiError as e:
-                # Status code 404 just means we're not running in a container.
-                if e.resp.status != 404:
-                    logger.info("Getting current container: %s", e)
+            current = get_current_container(self.api, self.num_retries, logger)
+            if current is None:
                 return
             try:
                 self.api.containers().update(uuid=current['uuid'],