10218: Merge branch 'master' into 10218-record-node-info
[arvados.git] / sdk / cwl / arvados_cwl / done.py
1 import re
2 from cwltool.errors import WorkflowException
3 from collections import deque
4
5 def done(self, record, tmpdir, outdir, keepdir):
6     colname = "Output %s of %s" % (record["output"][0:7], self.name)
7
8     # check if collection already exists with same owner, name and content
9     collection_exists = self.arvrunner.api.collections().list(
10         filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
11                  ['portable_data_hash', '=', record["output"]],
12                  ["name", "=", colname]]
13     ).execute(num_retries=self.arvrunner.num_retries)
14
15     if not collection_exists["items"]:
16         # Create a collection located in the same project as the
17         # pipeline with the contents of the output.
18         # First, get output record.
19         collections = self.arvrunner.api.collections().list(
20             limit=1,
21             filters=[['portable_data_hash', '=', record["output"]]],
22             select=["manifest_text"]
23         ).execute(num_retries=self.arvrunner.num_retries)
24
25         if not collections["items"]:
26             raise WorkflowException(
27                 "[job %s] output '%s' cannot be found on API server" % (
28                     self.name, record["output"]))
29
30         # Create new collection in the parent project
31         # with the output contents.
32         self.arvrunner.api.collections().create(body={
33             "owner_uuid": self.arvrunner.project_uuid,
34             "name": colname,
35             "portable_data_hash": record["output"],
36             "manifest_text": collections["items"][0]["manifest_text"]
37         }, ensure_unique_name=True).execute(
38             num_retries=self.arvrunner.num_retries)
39
40     return done_outputs(self, record, tmpdir, outdir, keepdir)
41
42 def done_outputs(self, record, tmpdir, outdir, keepdir):
43     self.builder.outdir = outdir
44     self.builder.pathmapper.keepdir = keepdir
45     return self.collect_outputs("keep:" + record["output"])
46
47 crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:")
48 timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)")
49
50 def logtail(logcollection, logger, header, maxlen=25):
51     if len(logcollection) == 0:
52         logger.info(header)
53         logger.info("  ** log is empty **")
54         return
55
56     containersapi = ("crunch-run.txt" in logcollection)
57     mergelogs = {}
58
59     for log in logcollection.keys():
60         if not containersapi or log in ("crunch-run.txt", "stdout.txt", "stderr.txt"):
61             logname = log[:-4]
62             logt = deque([], maxlen)
63             mergelogs[logname] = logt
64             with logcollection.open(log) as f:
65                 for l in f:
66                     if containersapi:
67                         g = timestamp_re.match(l)
68                         logt.append((g.group(1), g.group(2)))
69                     elif not crunchstat_re.match(l):
70                         logt.append(l)
71
72     if containersapi:
73         keys = mergelogs.keys()
74         loglines = []
75         while True:
76             earliest = None
77             for k in keys:
78                 if mergelogs[k]:
79                     if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]:
80                         earliest = k
81             if earliest is None:
82                 break
83             ts, msg = mergelogs[earliest].popleft()
84             loglines.append("%s %s %s" % (ts, earliest, msg))
85         loglines = loglines[-maxlen:]
86     else:
87         loglines = mergelogs.values()[0]
88
89     logtxt = "\n  ".join(l.strip() for l in loglines)
90     logger.info(header)
91     logger.info("\n  %s", logtxt)