from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
+from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .pathmapper import NoFollowPathMapper
from .task_queue import TaskQueue
from .context import ArvLoadingContext, ArvRuntimeContext
+from .util import get_current_container
from ._version import __version__
from cwltool.pack import pack
DEFAULT_PRIORITY = 500
+class RuntimeStatusLoggingHandler(logging.Handler):
+ """
+ Intercepts logging calls and report them as runtime statuses on runner
+ containers.
+ """
+ def __init__(self, runtime_status_update_func):
+ super(RuntimeStatusLoggingHandler, self).__init__()
+ self.runtime_status_update = runtime_status_update_func
+
+ def emit(self, record):
+ kind = None
+ if record.levelno == logging.ERROR:
+ kind = 'error'
+ elif record.levelno == logging.WARNING:
+ kind = 'warning'
+ if kind is not None:
+ log_msg = record.getMessage()
+ if '\n' in log_msg:
+ # If the logged message is multi-line, use its first line as status
+ # and the rest as detail.
+ status, detail = log_msg.split('\n', 1)
+ self.runtime_status_update(
+ kind,
+ "%s: %s" % (record.name, status),
+ detail
+ )
+ else:
+ self.runtime_status_update(
+ kind,
+ "%s: %s" % (record.name, record.getMessage())
+ )
+
class ArvCwlRunner(object):
"""Execute a CWL tool or workflow, submit work (using either jobs or
containers API), wait for them to complete, and report output.
if arvargs.work_api is None:
raise Exception("No supported APIs")
else:
- raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
+ raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
+
+ if self.work_api == "jobs":
+ logger.warn("""
+*******************************
+Using the deprecated 'jobs' API.
+
+To get rid of this warning:
+
+Users: read about migrating at
+http://doc.arvados.org/user/cwl/cwl-style.html#migrate
+and use the option --api=containers
+
+Admins: configure the cluster to disable the 'jobs' API as described at:
+http://doc.arvados.org/install/install-api-server.html#disable_api_methods
+*******************************""")
self.loadingContext = ArvLoadingContext(vars(arvargs))
self.loadingContext.fetcher_constructor = self.fetcher_constructor
self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
self.loadingContext.construct_tool_object = self.arv_make_tool
+ # Add a custom logging handler to the root logger for runtime status reporting
+ # if running inside a container
+ if get_current_container(self.api, self.num_retries, logger):
+ root_logger = logging.getLogger('')
+ handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
+ root_logger.addHandler(handler)
def arv_make_tool(self, toolpath_object, loadingContext):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
with self.workflow_eval_lock:
if processStatus == "success":
logger.info("Overall process status is %s", processStatus)
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Complete"}).execute(num_retries=self.num_retries)
+ state = "Complete"
else:
logger.error("Overall process status is %s", processStatus)
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ state = "Failed"
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": state}).execute(num_retries=self.num_retries)
self.final_status = processStatus
self.final_output = out
self.workflow_eval_lock.notifyAll()
self.task_queue.add(partial(j.done, record))
del self.processes[uuid]
+ def runtime_status_update(self, kind, message, detail=None):
+ """
+ Updates the runtime_status field on the runner container.
+ Called from a failing child container: records the first child error
+ or updates the error count on subsequent error statuses.
+ Also called from other parts that need to report errros, warnings or just
+ activity statuses.
+ """
+ with self.workflow_eval_lock:
+ current = get_current_container(self.api, self.num_retries, logger)
+ if current is None:
+ return
+ runtime_status = current.get('runtime_status', {})
+ # In case of status being an error, only report the first one.
+ if kind == 'error':
+ if not runtime_status.get('error'):
+ runtime_status.update({
+ 'error': message
+ })
+ if detail is not None:
+ runtime_status.update({
+ 'errorDetail': detail
+ })
+ # Further errors are only mentioned as a count.
+ else:
+ # Get anything before an optional 'and N more' string.
+ try:
+ error_msg = re.match(
+ r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
+ more_failures = re.match(
+ r'.*\(and (\d+) more\)', runtime_status.get('error'))
+ except TypeError:
+ # Ignore tests stubbing errors
+ return
+ if more_failures:
+ failure_qty = int(more_failures.groups()[0])
+ runtime_status.update({
+ 'error': "%s (and %d more)" % (error_msg, failure_qty+1)
+ })
+ else:
+ runtime_status.update({
+ 'error': "%s (and 1 more)" % error_msg
+ })
+ elif kind in ['warning', 'activity']:
+ # Record the last warning/activity status without regard of
+ # previous occurences.
+ runtime_status.update({
+ kind: message
+ })
+ if detail is not None:
+ runtime_status.update({
+ kind+"Detail": detail
+ })
+ else:
+ # Ignore any other status kind
+ return
+ try:
+ self.api.containers().update(uuid=current['uuid'],
+ body={
+ 'runtime_status': runtime_status,
+ }).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.info("Couldn't update runtime_status: %s", e)
+
def wrapped_callback(self, cb, obj, st):
with self.workflow_eval_lock:
cb(obj, st)
def set_crunch_output(self):
if self.work_api == "containers":
- try:
- current = self.api.containers().current().execute(num_retries=self.num_retries)
- except ApiError as e:
- # Status code 404 just means we're not running in a container.
- if e.resp.status != 404:
- logger.info("Getting current container: %s", e)
+ current = get_current_container(self.api, self.num_retries, logger)
+ if current is None:
return
try:
self.api.containers().update(uuid=current['uuid'],