from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
+from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .pathmapper import NoFollowPathMapper
from .task_queue import TaskQueue
from .context import ArvLoadingContext, ArvRuntimeContext
+from .util import get_current_container
from ._version import __version__
from cwltool.pack import pack
DEFAULT_PRIORITY = 500
+class RuntimeStatusLoggingHandler(logging.Handler):
+ """
+ Intercepts logging calls and report them as runtime statuses on runner
+ containers.
+ """
+ def __init__(self, runtime_status_update_func):
+ super(RuntimeStatusLoggingHandler, self).__init__()
+ self.runtime_status_update = runtime_status_update_func
+
+ def emit(self, record):
+ kind = None
+ if record.levelno == logging.ERROR:
+ kind = 'error'
+ elif record.levelno == logging.WARNING:
+ kind = 'warning'
+ elif record.levelno == logging.INFO:
+ kind = 'activity'
+ if kind is not None:
+ log_msg = record.getMessage()
+ if '\n' in log_msg:
+ # If the logged message is multi-line, include it as a detail
+ self.runtime_status_update(
+ kind,
+ "%s from %s (please see details)" % (kind, record.name),
+ log_msg
+ )
+ else:
+ self.runtime_status_update(
+ kind,
+ "%s: %s" % (record.name, record.getMessage())
+ )
+
class ArvCwlRunner(object):
"""Execute a CWL tool or workflow, submit work (using either jobs or
containers API), wait for them to complete, and report output.
self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
self.loadingContext.construct_tool_object = self.arv_make_tool
+ # Add a custom logging handler to the root logger for runtime status reporting
+ # if running inside a container
+ if get_current_container(self.api, self.num_retries, logger):
+ root_logger = logging.getLogger('')
+ handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
+ root_logger.addHandler(handler)
def arv_make_tool(self, toolpath_object, loadingContext):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
activity statuses.
"""
with self.workflow_eval_lock:
- try:
- current = self.api.containers().current().execute(num_retries=self.num_retries)
- except ApiError as e:
- # Status code 404 just means we're not running in a container.
- if e.resp.status != 404:
- logger.info("Getting current container: %s", e)
+ current = get_current_container(self.api, self.num_retries, logger)
+ if current is None:
return
runtime_status = current.get('runtime_status', {})
# In case of status being an error, only report the first one.
# Further errors are only mentioned as a count.
else:
# Get anything before an optional 'and N more' string.
- error_msg = re.match(
- r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
- more_failures = re.match(
- r'.*\(and (\d+) more\)', runtime_status.get('error'))
+ try:
+ error_msg = re.match(
+ r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
+ more_failures = re.match(
+ r'.*\(and (\d+) more\)', runtime_status.get('error'))
+ except TypeError:
+ # Ignore tests stubbing errors
+ return
if more_failures:
failure_qty = int(more_failures.groups()[0])
runtime_status.update({
def set_crunch_output(self):
if self.work_api == "containers":
- try:
- current = self.api.containers().current().execute(num_retries=self.num_retries)
- except ApiError as e:
- # Status code 404 just means we're not running in a container.
- if e.resp.status != 404:
- logger.info("Getting current container: %s", e)
+ current = get_current_container(self.api, self.num_retries, logger)
+ if current is None:
return
try:
self.api.containers().update(uuid=current['uuid'],