X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0250713cc4e6d7fdf41fd7c0a99c6307e2eac72e..fea486f94bb5cc8f51d9563eafc172b6ba2aec57:/sdk/cwl/arvados_cwl/done.py diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py index 7bdae5a1bd..69b074cc73 100644 --- a/sdk/cwl/arvados_cwl/done.py +++ b/sdk/cwl/arvados_cwl/done.py @@ -1,4 +1,6 @@ +import re from cwltool.errors import WorkflowException +from collections import deque def done(self, record, tmpdir, outdir, keepdir): colname = "Output %s of %s" % (record["output"][0:7], self.name) @@ -22,8 +24,8 @@ def done(self, record, tmpdir, outdir, keepdir): if not collections["items"]: raise WorkflowException( - "Job output '%s' cannot be found on API server" % ( - record["output"])) + "[job %s] output '%s' cannot be found on API server" % ( + self.name, record["output"])) # Create new collection in the parent project # with the output contents. @@ -35,9 +37,55 @@ def done(self, record, tmpdir, outdir, keepdir): }, ensure_unique_name=True).execute( num_retries=self.arvrunner.num_retries) - done_outputs(self, record, tmpdir, outdir, keepdir) + return done_outputs(self, record, tmpdir, outdir, keepdir) def done_outputs(self, record, tmpdir, outdir, keepdir): self.builder.outdir = outdir self.builder.pathmapper.keepdir = keepdir return self.collect_outputs("keep:" + record["output"]) + +crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:") +timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)") + +def logtail(logcollection, logger, header, maxlen=25): + if len(logcollection) == 0: + logger.info(header) + logger.info(" ** log is empty **") + return + + containersapi = ("crunch-run.txt" in logcollection) + mergelogs = {} + + for log in logcollection.keys(): + if not containersapi or log in ("crunch-run.txt", "stdout.txt", "stderr.txt"): + logname = log[:-4] + logt = deque([], maxlen) + mergelogs[logname] = logt + with logcollection.open(log) as f: + for l in f: + if containersapi: + g = timestamp_re.match(l) + logt.append((g.group(1), g.group(2))) + elif not crunchstat_re.match(l): + logt.append(l) + + if containersapi: + keys = mergelogs.keys() + loglines = [] + while True: + earliest = None + for k in keys: + if mergelogs[k]: + if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]: + earliest = k + if earliest is None: + break + ts, msg = mergelogs[earliest].popleft() + loglines.append("%s %s %s" % (ts, earliest, msg)) + loglines = loglines[-maxlen:] + else: + loglines = mergelogs.values()[0] + + logtxt = "\n ".join(l.strip() for l in loglines) + logger.info(header) + logger.info("\n %s", logtxt)