X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/dc7d01f4d4031962ffd5734ca0c64146a7217e4a..eb58fd945645f5a670c761f7046b10885941167e:/sdk/cwl/arvados_cwl/done.py diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py index 9fe157f082..6d46e79cb8 100644 --- a/sdk/cwl/arvados_cwl/done.py +++ b/sdk/cwl/arvados_cwl/done.py @@ -1,41 +1,49 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + import re from cwltool.errors import WorkflowException from collections import deque def done(self, record, tmpdir, outdir, keepdir): - colname = "Output %s of %s" % (record["output"][0:7], self.name) - - # check if collection already exists with same owner, name and content - collection_exists = self.arvrunner.api.collections().list( - filters=[["owner_uuid", "=", self.arvrunner.project_uuid], - ['portable_data_hash', '=', record["output"]], - ["name", "=", colname]] - ).execute(num_retries=self.arvrunner.num_retries) + cols = [ + ("output", "Output %s of %s" % (record["output"][0:7], self.name), record["output"]), + ("log", "Log of %s" % (record["uuid"]), record["log"]) + ] - if not collection_exists["items"]: - # Create a collection located in the same project as the - # pipeline with the contents of the output. - # First, get output record. - collections = self.arvrunner.api.collections().list( - limit=1, - filters=[['portable_data_hash', '=', record["output"]]], - select=["manifest_text"] + for coltype, colname, colpdh in cols: + # check if collection already exists with same owner, name and content + collection_exists = self.arvrunner.api.collections().list( + filters=[["owner_uuid", "=", self.arvrunner.project_uuid], + ['portable_data_hash', '=', colpdh], + ["name", "=", colname]] ).execute(num_retries=self.arvrunner.num_retries) - if not collections["items"]: - raise WorkflowException( - "[job %s] output '%s' cannot be found on API server" % ( - self.name, record["output"])) + if not collection_exists["items"]: + # Create a collection located in the same project as the + # pipeline with the contents of the output/log. + # First, get output/log record. + collections = self.arvrunner.api.collections().list( + limit=1, + filters=[['portable_data_hash', '=', colpdh]], + select=["manifest_text"] + ).execute(num_retries=self.arvrunner.num_retries) + + if not collections["items"]: + raise WorkflowException( + "[job %s] %s '%s' cannot be found on API server" % ( + self.name, coltype, colpdh)) - # Create new collection in the parent project - # with the output contents. - self.arvrunner.api.collections().create(body={ - "owner_uuid": self.arvrunner.project_uuid, - "name": colname, - "portable_data_hash": record["output"], - "manifest_text": collections["items"][0]["manifest_text"] - }, ensure_unique_name=True).execute( - num_retries=self.arvrunner.num_retries) + # Create new collection in the parent project + # with the output/log contents. + self.arvrunner.api.collections().create(body={ + "owner_uuid": self.arvrunner.project_uuid, + "name": colname, + "portable_data_hash": colpdh, + "manifest_text": collections["items"][0]["manifest_text"] + }, ensure_unique_name=True).execute( + num_retries=self.arvrunner.num_retries) return done_outputs(self, record, tmpdir, outdir, keepdir) @@ -47,10 +55,9 @@ def done_outputs(self, record, tmpdir, outdir, keepdir): crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:") timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)") -def logtail(logcollection, logger, header, maxlen=25): +def logtail(logcollection, logfunc, header, maxlen=25): if len(logcollection) == 0: - logger.info(header) - logger.info(" ** log is empty **") + logfunc("%s\n%s", header, " ** log is empty **") return containersapi = ("crunch-run.txt" in logcollection) @@ -69,7 +76,7 @@ def logtail(logcollection, logger, header, maxlen=25): elif not crunchstat_re.match(l): logt.append(l) - if len(mergelogs) > 1: + if containersapi: keys = mergelogs.keys() loglines = [] while True: @@ -87,5 +94,4 @@ def logtail(logcollection, logger, header, maxlen=25): loglines = mergelogs.values()[0] logtxt = "\n ".join(l.strip() for l in loglines) - logger.info(header) - logger.info("\n %s", logtxt) + logfunc("%s\n\n %s", header, logtxt)