1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future.utils import viewvalues
8 from cwltool.errors import WorkflowException
9 from collections import deque
11 def done(self, record, tmpdir, outdir, keepdir):
13 ("output", "Output %s of %s" % (record["output"][0:7], self.name), record["output"]),
14 ("log", "Log of %s" % (record["uuid"]), record["log"])
17 for coltype, colname, colpdh in cols:
18 # check if collection already exists with same owner, name and content
19 collection_exists = self.arvrunner.api.collections().list(
20 filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
21 ['portable_data_hash', '=', colpdh],
22 ["name", "=", colname]]
23 ).execute(num_retries=self.arvrunner.num_retries)
25 if not collection_exists["items"]:
26 # Create a collection located in the same project as the
27 # pipeline with the contents of the output/log.
28 # First, get output/log record.
29 collections = self.arvrunner.api.collections().list(
31 filters=[['portable_data_hash', '=', colpdh]],
32 select=["manifest_text"]
33 ).execute(num_retries=self.arvrunner.num_retries)
35 if not collections["items"]:
36 raise WorkflowException(
37 "[job %s] %s '%s' cannot be found on API server" % (
38 self.name, coltype, colpdh))
40 # Create new collection in the parent project
41 # with the output/log contents.
42 self.arvrunner.api.collections().create(body={
43 "owner_uuid": self.arvrunner.project_uuid,
45 "portable_data_hash": colpdh,
46 "manifest_text": collections["items"][0]["manifest_text"]
47 }, ensure_unique_name=True).execute(
48 num_retries=self.arvrunner.num_retries)
50 return done_outputs(self, record, tmpdir, outdir, keepdir)
52 def done_outputs(self, record, tmpdir, outdir, keepdir):
53 self.builder.outdir = outdir
54 self.builder.pathmapper.keepdir = keepdir
55 return self.collect_outputs("keep:" + record["output"], record["exit_code"])
57 crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:")
58 timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)")
60 def logtail(logcollection, logfunc, header, maxlen=25, include_crunchrun=True):
61 if len(logcollection) == 0:
62 logfunc("%s\n%s", header, " ** log is empty **")
66 logfiles = ["stdout.txt", "stderr.txt"]
69 logfiles.append("crunch-run.txt")
72 if log not in logcollection:
74 logname = log[:-4] # trim off the .txt
75 logt = deque([], maxlen)
76 mergelogs[logname] = logt
77 with logcollection.open(log, encoding="utf-8") as f:
79 g = timestamp_re.match(l)
80 logt.append((g.group(1), g.group(2)))
82 keys = list(mergelogs)
85 # we assume the log lines are all in order so this this is a
86 # straight linear merge where we look at the next timestamp of
87 # each log and take whichever one is earliest.
92 if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]:
96 ts, msg = mergelogs[earliest].popleft()
97 loglines.append("%s %s %s" % (ts, earliest, msg))
98 loglines = loglines[-maxlen:]
100 logtxt = "\n ".join(l.strip() for l in loglines)
101 logfunc("%s\n\n %s\n", header, logtxt)