X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/04f9ccc7ca627d41175f44f515e4581b6937f43b..refs/heads/20455-noopener:/sdk/cwl/arvados_cwl/done.py diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py index 48466f00c2..5c12419765 100644 --- a/sdk/cwl/arvados_cwl/done.py +++ b/sdk/cwl/arvados_cwl/done.py @@ -1,3 +1,9 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from future.utils import viewvalues + import re from cwltool.errors import WorkflowException from collections import deque @@ -46,50 +52,50 @@ def done(self, record, tmpdir, outdir, keepdir): def done_outputs(self, record, tmpdir, outdir, keepdir): self.builder.outdir = outdir self.builder.pathmapper.keepdir = keepdir - return self.collect_outputs("keep:" + record["output"]) + return self.collect_outputs("keep:" + record["output"], record["exit_code"]) crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:") timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)") -def logtail(logcollection, logger, header, maxlen=25): +def logtail(logcollection, logfunc, header, maxlen=25, include_crunchrun=True): if len(logcollection) == 0: - logger.info(header) - logger.info(" ** log is empty **") + logfunc("%s\n%s", header, " ** log is empty **") return - containersapi = ("crunch-run.txt" in logcollection) mergelogs = {} + logfiles = ["stdout.txt", "stderr.txt"] + + if include_crunchrun: + logfiles.append("crunch-run.txt") + + for log in logfiles: + if log not in logcollection: + continue + logname = log[:-4] # trim off the .txt + logt = deque([], maxlen) + mergelogs[logname] = logt + with logcollection.open(log, encoding="utf-8") as f: + for l in f: + g = timestamp_re.match(l) + logt.append((g.group(1), g.group(2))) + + keys = list(mergelogs) + loglines = [] - for log in logcollection.keys(): - if not containersapi or log in ("crunch-run.txt", "stdout.txt", "stderr.txt"): - logname = log[:-4] - logt = deque([], maxlen) - mergelogs[logname] = logt - with logcollection.open(log) as f: - for l in f: - if containersapi: - g = timestamp_re.match(l) - logt.append((g.group(1), g.group(2))) - elif not crunchstat_re.match(l): - logt.append(l) - - if containersapi: - keys = mergelogs.keys() - loglines = [] - while True: - earliest = None - for k in keys: - if mergelogs[k]: - if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]: - earliest = k - if earliest is None: - break - ts, msg = mergelogs[earliest].popleft() - loglines.append("%s %s %s" % (ts, earliest, msg)) - loglines = loglines[-maxlen:] - else: - loglines = mergelogs.values()[0] + # we assume the log lines are all in order so this this is a + # straight linear merge where we look at the next timestamp of + # each log and take whichever one is earliest. + while True: + earliest = None + for k in keys: + if mergelogs[k]: + if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]: + earliest = k + if earliest is None: + break + ts, msg = mergelogs[earliest].popleft() + loglines.append("%s %s %s" % (ts, earliest, msg)) + loglines = loglines[-maxlen:] logtxt = "\n ".join(l.strip() for l in loglines) - logger.info(header) - logger.info("\n %s", logtxt) + logfunc("%s\n\n %s\n", header, logtxt)