20455: Use noopener everywhere on links and window.open
[arvados.git] / sdk / cwl / arvados_cwl / done.py
index 69b074cc73ccb02d0b4c12e096075c12f2d02871..5c1241976597286eba0edbda62690a431d349c0f 100644 (file)
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from future.utils import viewvalues
+
 import re
 from cwltool.errors import WorkflowException
 from collections import deque
 
 def done(self, record, tmpdir, outdir, keepdir):
-    colname = "Output %s of %s" % (record["output"][0:7], self.name)
-
-    # check if collection already exists with same owner, name and content
-    collection_exists = self.arvrunner.api.collections().list(
-        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
-                 ['portable_data_hash', '=', record["output"]],
-                 ["name", "=", colname]]
-    ).execute(num_retries=self.arvrunner.num_retries)
-
-    if not collection_exists["items"]:
-        # Create a collection located in the same project as the
-        # pipeline with the contents of the output.
-        # First, get output record.
-        collections = self.arvrunner.api.collections().list(
-            limit=1,
-            filters=[['portable_data_hash', '=', record["output"]]],
-            select=["manifest_text"]
+    cols = [
+        ("output", "Output %s of %s" % (record["output"][0:7], self.name), record["output"]),
+        ("log", "Log of %s" % (record["uuid"]), record["log"])
+    ]
+
+    for coltype, colname, colpdh in cols:
+        # check if collection already exists with same owner, name and content
+        collection_exists = self.arvrunner.api.collections().list(
+            filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+                     ['portable_data_hash', '=', colpdh],
+                     ["name", "=", colname]]
         ).execute(num_retries=self.arvrunner.num_retries)
 
-        if not collections["items"]:
-            raise WorkflowException(
-                "[job %s] output '%s' cannot be found on API server" % (
-                    self.name, record["output"]))
-
-        # Create new collection in the parent project
-        # with the output contents.
-        self.arvrunner.api.collections().create(body={
-            "owner_uuid": self.arvrunner.project_uuid,
-            "name": colname,
-            "portable_data_hash": record["output"],
-            "manifest_text": collections["items"][0]["manifest_text"]
-        }, ensure_unique_name=True).execute(
-            num_retries=self.arvrunner.num_retries)
+        if not collection_exists["items"]:
+            # Create a collection located in the same project as the
+            # pipeline with the contents of the output/log.
+            # First, get output/log record.
+            collections = self.arvrunner.api.collections().list(
+                limit=1,
+                filters=[['portable_data_hash', '=', colpdh]],
+                select=["manifest_text"]
+            ).execute(num_retries=self.arvrunner.num_retries)
+
+            if not collections["items"]:
+                raise WorkflowException(
+                    "[job %s] %s '%s' cannot be found on API server" % (
+                        self.name, coltype, colpdh))
+
+            # Create new collection in the parent project
+            # with the output/log contents.
+            self.arvrunner.api.collections().create(body={
+                "owner_uuid": self.arvrunner.project_uuid,
+                "name": colname,
+                "portable_data_hash": colpdh,
+                "manifest_text": collections["items"][0]["manifest_text"]
+            }, ensure_unique_name=True).execute(
+                num_retries=self.arvrunner.num_retries)
 
     return done_outputs(self, record, tmpdir, outdir, keepdir)
 
 def done_outputs(self, record, tmpdir, outdir, keepdir):
     self.builder.outdir = outdir
     self.builder.pathmapper.keepdir = keepdir
-    return self.collect_outputs("keep:" + record["output"])
+    return self.collect_outputs("keep:" + record["output"], record["exit_code"])
 
 crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:")
 timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)")
 
-def logtail(logcollection, logger, header, maxlen=25):
+def logtail(logcollection, logfunc, header, maxlen=25, include_crunchrun=True):
     if len(logcollection) == 0:
-        logger.info(header)
-        logger.info("  ** log is empty **")
+        logfunc("%s\n%s", header, "  ** log is empty **")
         return
 
-    containersapi = ("crunch-run.txt" in logcollection)
     mergelogs = {}
+    logfiles = ["stdout.txt", "stderr.txt"]
+
+    if include_crunchrun:
+        logfiles.append("crunch-run.txt")
+
+    for log in logfiles:
+        if log not in logcollection:
+            continue
+        logname = log[:-4]  # trim off the .txt
+        logt = deque([], maxlen)
+        mergelogs[logname] = logt
+        with logcollection.open(log, encoding="utf-8") as f:
+            for l in f:
+                g = timestamp_re.match(l)
+                logt.append((g.group(1), g.group(2)))
+
+    keys = list(mergelogs)
+    loglines = []
 
-    for log in logcollection.keys():
-        if not containersapi or log in ("crunch-run.txt", "stdout.txt", "stderr.txt"):
-            logname = log[:-4]
-            logt = deque([], maxlen)
-            mergelogs[logname] = logt
-            with logcollection.open(log) as f:
-                for l in f:
-                    if containersapi:
-                        g = timestamp_re.match(l)
-                        logt.append((g.group(1), g.group(2)))
-                    elif not crunchstat_re.match(l):
-                        logt.append(l)
-
-    if containersapi:
-        keys = mergelogs.keys()
-        loglines = []
-        while True:
-            earliest = None
-            for k in keys:
-                if mergelogs[k]:
-                    if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]:
-                        earliest = k
-            if earliest is None:
-                break
-            ts, msg = mergelogs[earliest].popleft()
-            loglines.append("%s %s %s" % (ts, earliest, msg))
-        loglines = loglines[-maxlen:]
-    else:
-        loglines = mergelogs.values()[0]
+    # we assume the log lines are all in order so this this is a
+    # straight linear merge where we look at the next timestamp of
+    # each log and take whichever one is earliest.
+    while True:
+        earliest = None
+        for k in keys:
+            if mergelogs[k]:
+                if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]:
+                    earliest = k
+        if earliest is None:
+            break
+        ts, msg = mergelogs[earliest].popleft()
+        loglines.append("%s %s %s" % (ts, earliest, msg))
+    loglines = loglines[-maxlen:]
 
     logtxt = "\n  ".join(l.strip() for l in loglines)
-    logger.info(header)
-    logger.info("\n  %s", logtxt)
+    logfunc("%s\n\n  %s\n", header, logtxt)