21718: Replace .decode method with str(bytes, "utf-8")
[arvados.git] / sdk / cwl / arvados_cwl / done.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import re
6
7 from collections import deque
8 from cwltool.errors import WorkflowException
9
10 def done(self, record, tmpdir, outdir, keepdir):
11     cols = [
12         ("output", "Output %s of %s" % (record["output"][0:7], self.name), record["output"]),
13         ("log", "Log of %s" % (record["uuid"]), record["log"])
14     ]
15
16     for coltype, colname, colpdh in cols:
17         # check if collection already exists with same owner, name and content
18         collection_exists = self.arvrunner.api.collections().list(
19             filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
20                      ['portable_data_hash', '=', colpdh],
21                      ["name", "=", colname]]
22         ).execute(num_retries=self.arvrunner.num_retries)
23
24         if not collection_exists["items"]:
25             # Create a collection located in the same project as the
26             # pipeline with the contents of the output/log.
27             # First, get output/log record.
28             collections = self.arvrunner.api.collections().list(
29                 limit=1,
30                 filters=[['portable_data_hash', '=', colpdh]],
31                 select=["manifest_text"]
32             ).execute(num_retries=self.arvrunner.num_retries)
33
34             if not collections["items"]:
35                 raise WorkflowException(
36                     "[job %s] %s '%s' cannot be found on API server" % (
37                         self.name, coltype, colpdh))
38
39             # Create new collection in the parent project
40             # with the output/log contents.
41             self.arvrunner.api.collections().create(body={
42                 "owner_uuid": self.arvrunner.project_uuid,
43                 "name": colname,
44                 "portable_data_hash": colpdh,
45                 "manifest_text": collections["items"][0]["manifest_text"]
46             }, ensure_unique_name=True).execute(
47                 num_retries=self.arvrunner.num_retries)
48
49     return done_outputs(self, record, tmpdir, outdir, keepdir)
50
51 def done_outputs(self, record, tmpdir, outdir, keepdir):
52     self.builder.outdir = outdir
53     self.builder.pathmapper.keepdir = keepdir
54     return self.collect_outputs("keep:" + record["output"], record["exit_code"])
55
56 crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:")
57 timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)")
58
59 def logtail(logcollection, logfunc, header, maxlen=25, include_crunchrun=True):
60     if len(logcollection) == 0:
61         logfunc("%s\n%s", header, "  ** log is empty **")
62         return
63
64     mergelogs = {}
65     logfiles = ["stdout.txt", "stderr.txt"]
66
67     if include_crunchrun:
68         logfiles.append("crunch-run.txt")
69
70     for log in logfiles:
71         if log not in logcollection:
72             continue
73         logname = log[:-4]  # trim off the .txt
74         logt = deque([], maxlen)
75         mergelogs[logname] = logt
76         with logcollection.open(log, encoding="utf-8") as f:
77             for l in f:
78                 g = timestamp_re.match(l)
79                 logt.append((g.group(1), g.group(2)))
80
81     keys = list(mergelogs)
82     loglines = []
83
84     # we assume the log lines are all in order so this this is a
85     # straight linear merge where we look at the next timestamp of
86     # each log and take whichever one is earliest.
87     while True:
88         earliest = None
89         for k in keys:
90             if mergelogs[k]:
91                 if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]:
92                     earliest = k
93         if earliest is None:
94             break
95         ts, msg = mergelogs[earliest].popleft()
96         loglines.append("%s %s %s" % (ts, earliest, msg))
97     loglines = loglines[-maxlen:]
98
99     logtxt = "\n  ".join(l.strip() for l in loglines)
100     logfunc("%s\n\n  %s\n", header, logtxt)