X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a12864a31d5569c74ed32157d5fe928a1c2563b7..a24e4872dc015304b76c33b1246e4a3fc5163711:/sdk/cwl/arvados_cwl/done.py diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py index 87908c28ce..e12fe185a0 100644 --- a/sdk/cwl/arvados_cwl/done.py +++ b/sdk/cwl/arvados_cwl/done.py @@ -1,68 +1,99 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from future.utils import viewvalues + import re from cwltool.errors import WorkflowException from collections import deque def done(self, record, tmpdir, outdir, keepdir): - colname = "Output %s of %s" % (record["output"][0:7], self.name) + cols = [ + ("output", "Output %s of %s" % (record["output"][0:7], self.name), record["output"]), + ("log", "Log of %s" % (record["uuid"]), record["log"]) + ] - # check if collection already exists with same owner, name and content - collection_exists = self.arvrunner.api.collections().list( - filters=[["owner_uuid", "=", self.arvrunner.project_uuid], - ['portable_data_hash', '=', record["output"]], - ["name", "=", colname]] - ).execute(num_retries=self.arvrunner.num_retries) - - if not collection_exists["items"]: - # Create a collection located in the same project as the - # pipeline with the contents of the output. - # First, get output record. - collections = self.arvrunner.api.collections().list( - limit=1, - filters=[['portable_data_hash', '=', record["output"]]], - select=["manifest_text"] + for coltype, colname, colpdh in cols: + # check if collection already exists with same owner, name and content + collection_exists = self.arvrunner.api.collections().list( + filters=[["owner_uuid", "=", self.arvrunner.project_uuid], + ['portable_data_hash', '=', colpdh], + ["name", "=", colname]] ).execute(num_retries=self.arvrunner.num_retries) - if not collections["items"]: - raise WorkflowException( - "[job %s] output '%s' cannot be found on API server" % ( - self.name, record["output"])) + if not collection_exists["items"]: + # Create a collection located in the same project as the + # pipeline with the contents of the output/log. + # First, get output/log record. + collections = self.arvrunner.api.collections().list( + limit=1, + filters=[['portable_data_hash', '=', colpdh]], + select=["manifest_text"] + ).execute(num_retries=self.arvrunner.num_retries) + + if not collections["items"]: + raise WorkflowException( + "[job %s] %s '%s' cannot be found on API server" % ( + self.name, coltype, colpdh)) - # Create new collection in the parent project - # with the output contents. - self.arvrunner.api.collections().create(body={ - "owner_uuid": self.arvrunner.project_uuid, - "name": colname, - "portable_data_hash": record["output"], - "manifest_text": collections["items"][0]["manifest_text"] - }, ensure_unique_name=True).execute( - num_retries=self.arvrunner.num_retries) + # Create new collection in the parent project + # with the output/log contents. + self.arvrunner.api.collections().create(body={ + "owner_uuid": self.arvrunner.project_uuid, + "name": colname, + "portable_data_hash": colpdh, + "manifest_text": collections["items"][0]["manifest_text"] + }, ensure_unique_name=True).execute( + num_retries=self.arvrunner.num_retries) return done_outputs(self, record, tmpdir, outdir, keepdir) def done_outputs(self, record, tmpdir, outdir, keepdir): self.builder.outdir = outdir self.builder.pathmapper.keepdir = keepdir - return self.collect_outputs("keep:" + record["output"]) + return self.collect_outputs("keep:" + record["output"], record["exit_code"]) crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:") timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)") -def logtail(logcollection, logger, header, maxlen=25): - logtail = deque([], maxlen*len(logcollection)) +def logtail(logcollection, logfunc, header, maxlen=25): + if len(logcollection) == 0: + logfunc("%s\n%s", header, " ** log is empty **") + return + containersapi = ("crunch-run.txt" in logcollection) + mergelogs = {} - for log in logcollection.keys(): + for log in list(logcollection): if not containersapi or log in ("crunch-run.txt", "stdout.txt", "stderr.txt"): logname = log[:-4] - with logcollection.open(log) as f: + logt = deque([], maxlen) + mergelogs[logname] = logt + with logcollection.open(log, encoding="utf-8") as f: for l in f: if containersapi: g = timestamp_re.match(l) - logtail.append("%s %s %s" % (g.group(1), logname, g.group(2))) + logt.append((g.group(1), g.group(2))) elif not crunchstat_re.match(l): - logtail.append(l) - if len(logcollection) > 1: - logtail = sorted(logtail)[-maxlen:] - logtxt = "\n ".join(l.strip() for l in logtail) - logger.info(header) - logger.info("\n %s", logtxt) + logt.append(l) + + if containersapi: + keys = list(mergelogs) + loglines = [] + while True: + earliest = None + for k in keys: + if mergelogs[k]: + if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]: + earliest = k + if earliest is None: + break + ts, msg = mergelogs[earliest].popleft() + loglines.append("%s %s %s" % (ts, earliest, msg)) + loglines = loglines[-maxlen:] + else: + loglines = mergelogs[list(mergelogs)[0]] + + logtxt = "\n ".join(l.strip() for l in loglines) + logfunc("%s\n\n %s", header, logtxt)