2 from cwltool.errors import WorkflowException
3 from collections import deque
5 def done(self, record, tmpdir, outdir, keepdir):
7 ("output", "Output %s of %s" % (record["output"][0:7], self.name), record["output"]),
8 ("log", "Log of %s" % (record["uuid"]), record["log"])
11 for coltype, colname, colpdh in cols:
12 # check if collection already exists with same owner, name and content
13 collection_exists = self.arvrunner.api.collections().list(
14 filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
15 ['portable_data_hash', '=', colpdh],
16 ["name", "=", colname]]
17 ).execute(num_retries=self.arvrunner.num_retries)
19 if not collection_exists["items"]:
20 # Create a collection located in the same project as the
21 # pipeline with the contents of the output/log.
22 # First, get output/log record.
23 collections = self.arvrunner.api.collections().list(
25 filters=[['portable_data_hash', '=', colpdh]],
26 select=["manifest_text"]
27 ).execute(num_retries=self.arvrunner.num_retries)
29 if not collections["items"]:
30 raise WorkflowException(
31 "[job %s] %s '%s' cannot be found on API server" % (
32 self.name, coltype, colpdh))
34 # Create new collection in the parent project
35 # with the output/log contents.
36 self.arvrunner.api.collections().create(body={
37 "owner_uuid": self.arvrunner.project_uuid,
39 "portable_data_hash": colpdh,
40 "manifest_text": collections["items"][0]["manifest_text"]
41 }, ensure_unique_name=True).execute(
42 num_retries=self.arvrunner.num_retries)
44 return done_outputs(self, record, tmpdir, outdir, keepdir)
46 def done_outputs(self, record, tmpdir, outdir, keepdir):
47 self.builder.outdir = outdir
48 self.builder.pathmapper.keepdir = keepdir
49 return self.collect_outputs("keep:" + record["output"])
51 crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:")
52 timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)")
54 def logtail(logcollection, logger, header, maxlen=25):
55 if len(logcollection) == 0:
57 logger.info(" ** log is empty **")
60 containersapi = ("crunch-run.txt" in logcollection)
63 for log in logcollection.keys():
64 if not containersapi or log in ("crunch-run.txt", "stdout.txt", "stderr.txt"):
66 logt = deque([], maxlen)
67 mergelogs[logname] = logt
68 with logcollection.open(log) as f:
71 g = timestamp_re.match(l)
72 logt.append((g.group(1), g.group(2)))
73 elif not crunchstat_re.match(l):
77 keys = mergelogs.keys()
83 if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]:
87 ts, msg = mergelogs[earliest].popleft()
88 loglines.append("%s %s %s" % (ts, earliest, msg))
89 loglines = loglines[-maxlen:]
91 loglines = mergelogs.values()[0]
93 logtxt = "\n ".join(l.strip() for l in loglines)
95 logger.info("\n %s", logtxt)