X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4aee7d57faff02fc6b7b6f750dc22a29e58bb963..873fcf181c037cc1e42419bfeaf5bb70c9d9e239:/sdk/cwl/arvados_cwl/done.py diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py index c0e3e0de2a..98c9f3a5df 100644 --- a/sdk/cwl/arvados_cwl/done.py +++ b/sdk/cwl/arvados_cwl/done.py @@ -2,11 +2,10 @@ # # SPDX-License-Identifier: Apache-2.0 -from future.utils import viewvalues - import re -from cwltool.errors import WorkflowException + from collections import deque +from cwltool.errors import WorkflowException def done(self, record, tmpdir, outdir, keepdir): cols = [ @@ -57,43 +56,45 @@ def done_outputs(self, record, tmpdir, outdir, keepdir): crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:") timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)") -def logtail(logcollection, logfunc, header, maxlen=25): +def logtail(logcollection, logfunc, header, maxlen=25, include_crunchrun=True): if len(logcollection) == 0: logfunc("%s\n%s", header, " ** log is empty **") return - containersapi = ("crunch-run.txt" in logcollection) mergelogs = {} + logfiles = ["stdout.txt", "stderr.txt"] + + if include_crunchrun: + logfiles.append("crunch-run.txt") + + for log in logfiles: + if log not in logcollection: + continue + logname = log[:-4] # trim off the .txt + logt = deque([], maxlen) + mergelogs[logname] = logt + with logcollection.open(log, encoding="utf-8") as f: + for l in f: + g = timestamp_re.match(l) + logt.append((g.group(1), g.group(2))) + + keys = list(mergelogs) + loglines = [] - for log in list(logcollection): - if not containersapi or log in ("crunch-run.txt", "stdout.txt", "stderr.txt"): - logname = log[:-4] - logt = deque([], maxlen) - mergelogs[logname] = logt - with logcollection.open(log) as f: - for l in f: - if containersapi: - g = timestamp_re.match(l) - logt.append((g.group(1), g.group(2))) - elif not crunchstat_re.match(l): - logt.append(l) - - if containersapi: - keys = list(mergelogs) - loglines = [] - while True: - earliest = None - for k in keys: - if mergelogs[k]: - if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]: - earliest = k - if earliest is None: - break - ts, msg = mergelogs[earliest].popleft() - loglines.append("%s %s %s" % (ts, earliest, msg)) - loglines = loglines[-maxlen:] - else: - loglines = mergelogs[list(mergelogs)[0]] + # we assume the log lines are all in order so this this is a + # straight linear merge where we look at the next timestamp of + # each log and take whichever one is earliest. + while True: + earliest = None + for k in keys: + if mergelogs[k]: + if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]: + earliest = k + if earliest is None: + break + ts, msg = mergelogs[earliest].popleft() + loglines.append("%s %s %s" % (ts, earliest, msg)) + loglines = loglines[-maxlen:] logtxt = "\n ".join(l.strip() for l in loglines) - logfunc("%s\n\n %s", header, logtxt) + logfunc("%s\n\n %s\n", header, logtxt)