+ body={"state": state}).execute(num_retries=self.num_retries)
+ self.final_status = processStatus
+ self.final_output = out
+ self.workflow_eval_lock.notifyAll()
+
+
+ def start_run(self, runnable, runtimeContext):
+ self.task_queue.add(partial(runnable.run, runtimeContext))
+
+ def process_submitted(self, container):
+ with self.workflow_eval_lock:
+ self.processes[container.uuid] = container
+
+ def process_done(self, uuid, record):
+ with self.workflow_eval_lock:
+ j = self.processes[uuid]
+ logger.info("%s %s is %s", self.label(j), uuid, record["state"])
+ self.task_queue.add(partial(j.done, record))
+ del self.processes[uuid]
+
+ def runtime_status_update(self, kind, message, detail=None):
+ """
+ Updates the runtime_status field on the runner container.
+ Called from a failing child container: records the first child error
+ or updates the error count on subsequent error statuses.
+ Also called from other parts that need to report errros, warnings or just
+ activity statuses.
+ """
+ with self.workflow_eval_lock:
+ current = get_current_container(self.api, self.num_retries, logger)
+ if current is None:
+ return
+ runtime_status = current.get('runtime_status', {})
+ # In case of status being an error, only report the first one.
+ if kind == 'error':
+ if not runtime_status.get('error'):
+ runtime_status.update({
+ 'error': message
+ })
+ if detail is not None:
+ runtime_status.update({
+ 'errorDetail': detail
+ })
+ # Further errors are only mentioned as a count.
+ else:
+ # Get anything before an optional 'and N more' string.
+ try:
+ error_msg = re.match(
+ r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
+ more_failures = re.match(
+ r'.*\(and (\d+) more\)', runtime_status.get('error'))
+ except TypeError:
+ # Ignore tests stubbing errors
+ return
+ if more_failures:
+ failure_qty = int(more_failures.groups()[0])
+ runtime_status.update({
+ 'error': "%s (and %d more)" % (error_msg, failure_qty+1)
+ })
+ else:
+ runtime_status.update({
+ 'error': "%s (and 1 more)" % error_msg
+ })
+ elif kind in ['warning', 'activity']:
+ # Record the last warning/activity status without regard of
+ # previous occurences.
+ runtime_status.update({
+ kind: message
+ })
+ if detail is not None:
+ runtime_status.update({
+ kind+"Detail": detail
+ })
+ else:
+ # Ignore any other status kind
+ return
+ try:
+ self.api.containers().update(uuid=current['uuid'],
+ body={
+ 'runtime_status': runtime_status,
+ }).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.info("Couldn't update runtime_status: %s", e)
+
+ def wrapped_callback(self, cb, obj, st):
+ with self.workflow_eval_lock:
+ cb(obj, st)
+ self.workflow_eval_lock.notifyAll()
+
+ def get_wrapped_callback(self, cb):
+ return partial(self.wrapped_callback, cb)