X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/535856c28a12bb07dc986b980b0f4ccfdfd25640..8674cfd50ad24171de7d157813819f3383c3353b:/sdk/cwl/arvados_cwl/__init__.py diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 131795ee2c..5ea402a666 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -38,7 +38,7 @@ import arvados.commands._util as arv_cmd from .arvcontainer import ArvadosContainer, RunnerContainer from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate -from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps +from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps from .arvtool import ArvadosCommandTool from .arvworkflow import ArvadosWorkflow, upload_workflow from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache @@ -46,6 +46,7 @@ from .perf import Perf from .pathmapper import NoFollowPathMapper from .task_queue import TaskQueue from .context import ArvLoadingContext, ArvRuntimeContext +from .util import get_current_container from ._version import __version__ from cwltool.pack import pack @@ -65,6 +66,38 @@ arvados.log_handler.setFormatter(logging.Formatter( DEFAULT_PRIORITY = 500 +class RuntimeStatusLoggingHandler(logging.Handler): + """ + Intercepts logging calls and report them as runtime statuses on runner + containers. + """ + def __init__(self, runtime_status_update_func): + super(RuntimeStatusLoggingHandler, self).__init__() + self.runtime_status_update = runtime_status_update_func + + def emit(self, record): + kind = None + if record.levelno == logging.ERROR: + kind = 'error' + elif record.levelno == logging.WARNING: + kind = 'warning' + if kind is not None: + log_msg = record.getMessage() + if '\n' in log_msg: + # If the logged message is multi-line, use its first line as status + # and the rest as detail. + status, detail = log_msg.split('\n', 1) + self.runtime_status_update( + kind, + "%s: %s" % (record.name, status), + detail + ) + else: + self.runtime_status_update( + kind, + "%s: %s" % (record.name, record.getMessage()) + ) + class ArvCwlRunner(object): """Execute a CWL tool or workflow, submit work (using either jobs or containers API), wait for them to complete, and report output. @@ -133,13 +166,34 @@ class ArvCwlRunner(object): if arvargs.work_api is None: raise Exception("No supported APIs") else: - raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api)) + raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api)) + + if self.work_api == "jobs": + logger.warn(""" +******************************* +Using the deprecated 'jobs' API. + +To get rid of this warning: + +Users: read about migrating at +http://doc.arvados.org/user/cwl/cwl-style.html#migrate +and use the option --api=containers + +Admins: configure the cluster to disable the 'jobs' API as described at: +http://doc.arvados.org/install/install-api-server.html#disable_api_methods +*******************************""") self.loadingContext = ArvLoadingContext(vars(arvargs)) self.loadingContext.fetcher_constructor = self.fetcher_constructor self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries) self.loadingContext.construct_tool_object = self.arv_make_tool + # Add a custom logging handler to the root logger for runtime status reporting + # if running inside a container + if get_current_container(self.api, self.num_retries, logger): + root_logger = logging.getLogger('') + handler = RuntimeStatusLoggingHandler(self.runtime_status_update) + root_logger.addHandler(handler) def arv_make_tool(self, toolpath_object, loadingContext): if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool": @@ -153,14 +207,13 @@ class ArvCwlRunner(object): with self.workflow_eval_lock: if processStatus == "success": logger.info("Overall process status is %s", processStatus) - if self.pipeline: - self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], - body={"state": "Complete"}).execute(num_retries=self.num_retries) + state = "Complete" else: logger.error("Overall process status is %s", processStatus) - if self.pipeline: - self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], - body={"state": "Failed"}).execute(num_retries=self.num_retries) + state = "Failed" + if self.pipeline: + self.api.pipeline_instances().update(uuid=self.pipeline["uuid"], + body={"state": state}).execute(num_retries=self.num_retries) self.final_status = processStatus self.final_output = out self.workflow_eval_lock.notifyAll() @@ -180,6 +233,70 @@ class ArvCwlRunner(object): self.task_queue.add(partial(j.done, record)) del self.processes[uuid] + def runtime_status_update(self, kind, message, detail=None): + """ + Updates the runtime_status field on the runner container. + Called from a failing child container: records the first child error + or updates the error count on subsequent error statuses. + Also called from other parts that need to report errros, warnings or just + activity statuses. + """ + with self.workflow_eval_lock: + current = get_current_container(self.api, self.num_retries, logger) + if current is None: + return + runtime_status = current.get('runtime_status', {}) + # In case of status being an error, only report the first one. + if kind == 'error': + if not runtime_status.get('error'): + runtime_status.update({ + 'error': message + }) + if detail is not None: + runtime_status.update({ + 'errorDetail': detail + }) + # Further errors are only mentioned as a count. + else: + # Get anything before an optional 'and N more' string. + try: + error_msg = re.match( + r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0] + more_failures = re.match( + r'.*\(and (\d+) more\)', runtime_status.get('error')) + except TypeError: + # Ignore tests stubbing errors + return + if more_failures: + failure_qty = int(more_failures.groups()[0]) + runtime_status.update({ + 'error': "%s (and %d more)" % (error_msg, failure_qty+1) + }) + else: + runtime_status.update({ + 'error': "%s (and 1 more)" % error_msg + }) + elif kind in ['warning', 'activity']: + # Record the last warning/activity status without regard of + # previous occurences. + runtime_status.update({ + kind: message + }) + if detail is not None: + runtime_status.update({ + kind+"Detail": detail + }) + else: + # Ignore any other status kind + return + try: + self.api.containers().update(uuid=current['uuid'], + body={ + 'runtime_status': runtime_status, + }).execute(num_retries=self.num_retries) + except Exception as e: + logger.info("Couldn't update runtime_status: %s", e) + def wrapped_callback(self, cb, obj, st): with self.workflow_eval_lock: cb(obj, st) @@ -363,12 +480,8 @@ class ArvCwlRunner(object): def set_crunch_output(self): if self.work_api == "containers": - try: - current = self.api.containers().current().execute(num_retries=self.num_retries) - except ApiError as e: - # Status code 404 just means we're not running in a container. - if e.resp.status != 404: - logger.info("Getting current container: %s", e) + current = get_current_container(self.api, self.num_retries, logger) + if current is None: return try: self.api.containers().update(uuid=current['uuid'],