Merge branch '18260-update-cwltool' refs #18260
[arvados.git] / sdk / cwl / arvados_cwl / done.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future.utils import viewvalues
6
7 import re
8 from cwltool.errors import WorkflowException
9 from collections import deque
10
11 def done(self, record, tmpdir, outdir, keepdir):
12     cols = [
13         ("output", "Output %s of %s" % (record["output"][0:7], self.name), record["output"]),
14         ("log", "Log of %s" % (record["uuid"]), record["log"])
15     ]
16
17     for coltype, colname, colpdh in cols:
18         # check if collection already exists with same owner, name and content
19         collection_exists = self.arvrunner.api.collections().list(
20             filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
21                      ['portable_data_hash', '=', colpdh],
22                      ["name", "=", colname]]
23         ).execute(num_retries=self.arvrunner.num_retries)
24
25         if not collection_exists["items"]:
26             # Create a collection located in the same project as the
27             # pipeline with the contents of the output/log.
28             # First, get output/log record.
29             collections = self.arvrunner.api.collections().list(
30                 limit=1,
31                 filters=[['portable_data_hash', '=', colpdh]],
32                 select=["manifest_text"]
33             ).execute(num_retries=self.arvrunner.num_retries)
34
35             if not collections["items"]:
36                 raise WorkflowException(
37                     "[job %s] %s '%s' cannot be found on API server" % (
38                         self.name, coltype, colpdh))
39
40             # Create new collection in the parent project
41             # with the output/log contents.
42             self.arvrunner.api.collections().create(body={
43                 "owner_uuid": self.arvrunner.project_uuid,
44                 "name": colname,
45                 "portable_data_hash": colpdh,
46                 "manifest_text": collections["items"][0]["manifest_text"]
47             }, ensure_unique_name=True).execute(
48                 num_retries=self.arvrunner.num_retries)
49
50     return done_outputs(self, record, tmpdir, outdir, keepdir)
51
52 def done_outputs(self, record, tmpdir, outdir, keepdir):
53     self.builder.outdir = outdir
54     self.builder.pathmapper.keepdir = keepdir
55     return self.collect_outputs("keep:" + record["output"], record["exit_code"])
56
57 crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:")
58 timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)")
59
60 def logtail(logcollection, logfunc, header, maxlen=25):
61     if len(logcollection) == 0:
62         logfunc("%s\n%s", header, "  ** log is empty **")
63         return
64
65     containersapi = ("crunch-run.txt" in logcollection)
66     mergelogs = {}
67
68     for log in list(logcollection):
69         if not containersapi or log in ("crunch-run.txt", "stdout.txt", "stderr.txt"):
70             logname = log[:-4]
71             logt = deque([], maxlen)
72             mergelogs[logname] = logt
73             with logcollection.open(log, encoding="utf-8") as f:
74                 for l in f:
75                     if containersapi:
76                         g = timestamp_re.match(l)
77                         logt.append((g.group(1), g.group(2)))
78                     elif not crunchstat_re.match(l):
79                         logt.append(l)
80
81     if containersapi:
82         keys = list(mergelogs)
83         loglines = []
84         while True:
85             earliest = None
86             for k in keys:
87                 if mergelogs[k]:
88                     if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]:
89                         earliest = k
90             if earliest is None:
91                 break
92             ts, msg = mergelogs[earliest].popleft()
93             loglines.append("%s %s %s" % (ts, earliest, msg))
94         loglines = loglines[-maxlen:]
95     else:
96         loglines = mergelogs[list(mergelogs)[0]]
97
98     logtxt = "\n  ".join(l.strip() for l in loglines)
99     logfunc("%s\n\n  %s", header, logtxt)