from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
+from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .pathmapper import NoFollowPathMapper
from .task_queue import TaskQueue
from .context import ArvLoadingContext, ArvRuntimeContext
+from .util import get_current_container
from ._version import __version__
from cwltool.pack import pack
DEFAULT_PRIORITY = 500
+class RuntimeStatusLoggingHandler(logging.Handler):
+ """
+ Intercepts logging calls and report them as runtime statuses on runner
+ containers.
+ """
+ def __init__(self, runtime_status_update_func):
+ super(RuntimeStatusLoggingHandler, self).__init__()
+ self.runtime_status_update = runtime_status_update_func
+
+ def emit(self, record):
+ kind = None
+ if record.levelno == logging.ERROR:
+ kind = 'error'
+ elif record.levelno == logging.WARNING:
+ kind = 'warning'
+ if kind is not None:
+ log_msg = record.getMessage()
+ if '\n' in log_msg:
+ # If the logged message is multi-line, use its first line as status
+ # and the rest as detail.
+ status, detail = log_msg.split('\n', 1)
+ self.runtime_status_update(
+ kind,
+ "%s: %s" % (record.name, status),
+ detail
+ )
+ else:
+ self.runtime_status_update(
+ kind,
+ "%s: %s" % (record.name, record.getMessage())
+ )
+
class ArvCwlRunner(object):
"""Execute a CWL tool or workflow, submit work (using either jobs or
containers API), wait for them to complete, and report output.
self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
self.loadingContext.construct_tool_object = self.arv_make_tool
+ # Add a custom logging handler to the root logger for runtime status reporting
+ # if running inside a container
+ if get_current_container(self.api, self.num_retries, logger):
+ root_logger = logging.getLogger('')
+ handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
+ root_logger.addHandler(handler)
def arv_make_tool(self, toolpath_object, loadingContext):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
self.task_queue.add(partial(j.done, record))
del self.processes[uuid]
- def runtime_status_error(self, child_label, child_uuid, error_log):
+ def runtime_status_update(self, kind, message, detail=None):
"""
- Called from a failing child container. Records the first child error
- on this runner's runtime_status field.
- On subsequent errors, updates the 'error' key to show how many additional
- failures happened.
+ Updates the runtime_status field on the runner container.
+ Called from a failing child container: records the first child error
+ or updates the error count on subsequent error statuses.
+ Also called from other parts that need to report errros, warnings or just
+ activity statuses.
"""
- error_msg = "%s %s failed" % (child_label, child_uuid)
- logger.info(error_msg)
with self.workflow_eval_lock:
- try:
- current = self.api.containers().current().execute(num_retries=self.num_retries)
- except ApiError as e:
- # Status code 404 just means we're not running in a container.
- if e.resp.status != 404:
- logger.info("Getting current container: %s", e)
+ current = get_current_container(self.api, self.num_retries, logger)
+ if current is None:
return
runtime_status = current.get('runtime_status', {})
- # Save first fatal error
- if not runtime_status.get('error'):
- runtime_status.update({
- 'error': error_msg,
- 'errorDetail': error_log or "No error logs available"
- })
- # Further errors are only mentioned as a count
- else:
- error_msg = re.match(
- r'^(.*failed)\s*\(?', runtime_status.get('error')).groups()[0]
- more_failures = re.match(
- r'.*\(.*(\d+) more\)', runtime_status.get('error'))
- if more_failures:
- failure_qty = int(more_failures.groups()[0])
+ # In case of status being an error, only report the first one.
+ if kind == 'error':
+ if not runtime_status.get('error'):
runtime_status.update({
- 'error': "%s (and %d more)" % (error_msg, failure_qty+1)
+ 'error': message
})
+ if detail is not None:
+ runtime_status.update({
+ 'errorDetail': detail
+ })
+ # Further errors are only mentioned as a count.
else:
+ # Get anything before an optional 'and N more' string.
+ try:
+ error_msg = re.match(
+ r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
+ more_failures = re.match(
+ r'.*\(and (\d+) more\)', runtime_status.get('error'))
+ except TypeError:
+ # Ignore tests stubbing errors
+ return
+ if more_failures:
+ failure_qty = int(more_failures.groups()[0])
+ runtime_status.update({
+ 'error': "%s (and %d more)" % (error_msg, failure_qty+1)
+ })
+ else:
+ runtime_status.update({
+ 'error': "%s (and 1 more)" % error_msg
+ })
+ elif kind in ['warning', 'activity']:
+ # Record the last warning/activity status without regard of
+ # previous occurences.
+ runtime_status.update({
+ kind: message
+ })
+ if detail is not None:
runtime_status.update({
- 'error': "%s (and 1 more)" % error_msg
+ kind+"Detail": detail
})
+ else:
+ # Ignore any other status kind
+ return
try:
self.api.containers().update(uuid=current['uuid'],
body={
'runtime_status': runtime_status,
}).execute(num_retries=self.num_retries)
except Exception as e:
- logger.error("Updating runtime_status: %s", e)
+ logger.info("Couldn't update runtime_status: %s", e)
def wrapped_callback(self, cb, obj, st):
with self.workflow_eval_lock:
def set_crunch_output(self):
if self.work_api == "containers":
- try:
- current = self.api.containers().current().execute(num_retries=self.num_retries)
- except ApiError as e:
- # Status code 404 just means we're not running in a container.
- if e.resp.status != 404:
- logger.info("Getting current container: %s", e)
+ current = get_current_container(self.api, self.num_retries, logger)
+ if current is None:
return
try:
self.api.containers().update(uuid=current['uuid'],