X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b9f7a8693579045f05d142dba8bffd2c5660dfce..2f953026bc4baeccb78ca82acc4d07cad37625b8:/sdk/cwl/arvados_cwl/done.py diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py index 4dd83d294f..9fe157f082 100644 --- a/sdk/cwl/arvados_cwl/done.py +++ b/sdk/cwl/arvados_cwl/done.py @@ -1,3 +1,4 @@ +import re from cwltool.errors import WorkflowException from collections import deque @@ -43,13 +44,48 @@ def done_outputs(self, record, tmpdir, outdir, keepdir): self.builder.pathmapper.keepdir = keepdir return self.collect_outputs("keep:" + record["output"]) -def logtail(logcollection, logger, prefix, maxlen=25): - logtail = deque([], maxlen*len(logcollection)) +crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:") +timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)") + +def logtail(logcollection, logger, header, maxlen=25): + if len(logcollection) == 0: + logger.info(header) + logger.info(" ** log is empty **") + return + + containersapi = ("crunch-run.txt" in logcollection) + mergelogs = {} + for log in logcollection.keys(): - with logcollection.open(log) as f: - for l in f: - logtail.append(l) - if len(logcollection) > 1: - logtail = sorted(logtail)[-maxlen:] - for l in logtail: - logger.info("%s%s" % (prefix, l.rstrip())) + if not containersapi or log in ("crunch-run.txt", "stdout.txt", "stderr.txt"): + logname = log[:-4] + logt = deque([], maxlen) + mergelogs[logname] = logt + with logcollection.open(log) as f: + for l in f: + if containersapi: + g = timestamp_re.match(l) + logt.append((g.group(1), g.group(2))) + elif not crunchstat_re.match(l): + logt.append(l) + + if len(mergelogs) > 1: + keys = mergelogs.keys() + loglines = [] + while True: + earliest = None + for k in keys: + if mergelogs[k]: + if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]: + earliest = k + if earliest is None: + break + ts, msg = mergelogs[earliest].popleft() + loglines.append("%s %s %s" % (ts, earliest, msg)) + loglines = loglines[-maxlen:] + else: + loglines = mergelogs.values()[0] + + logtxt = "\n ".join(l.strip() for l in loglines) + logger.info(header) + logger.info("\n %s", logtxt)