test_with_arvbox determines the correct arvados/jobs image to pull by querying
[arvados.git] / sdk / cwl / arvados_cwl / done.py
1 import re
2 from cwltool.errors import WorkflowException
3 from collections import deque
4
5 def done(self, record, tmpdir, outdir, keepdir):
6     cols = [
7         ("output", "Output %s of %s" % (record["output"][0:7], self.name), record["output"]),
8         ("log", "Log of %s" % (record["uuid"]), record["log"])
9     ]
10
11     for coltype, colname, colpdh in cols:
12         # check if collection already exists with same owner, name and content
13         collection_exists = self.arvrunner.api.collections().list(
14             filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
15                      ['portable_data_hash', '=', colpdh],
16                      ["name", "=", colname]]
17         ).execute(num_retries=self.arvrunner.num_retries)
18
19         if not collection_exists["items"]:
20             # Create a collection located in the same project as the
21             # pipeline with the contents of the output/log.
22             # First, get output/log record.
23             collections = self.arvrunner.api.collections().list(
24                 limit=1,
25                 filters=[['portable_data_hash', '=', colpdh]],
26                 select=["manifest_text"]
27             ).execute(num_retries=self.arvrunner.num_retries)
28
29             if not collections["items"]:
30                 raise WorkflowException(
31                     "[job %s] %s '%s' cannot be found on API server" % (
32                         self.name, coltype, colpdh))
33
34             # Create new collection in the parent project
35             # with the output/log contents.
36             self.arvrunner.api.collections().create(body={
37                 "owner_uuid": self.arvrunner.project_uuid,
38                 "name": colname,
39                 "portable_data_hash": colpdh,
40                 "manifest_text": collections["items"][0]["manifest_text"]
41             }, ensure_unique_name=True).execute(
42                 num_retries=self.arvrunner.num_retries)
43
44     return done_outputs(self, record, tmpdir, outdir, keepdir)
45
46 def done_outputs(self, record, tmpdir, outdir, keepdir):
47     self.builder.outdir = outdir
48     self.builder.pathmapper.keepdir = keepdir
49     return self.collect_outputs("keep:" + record["output"])
50
51 crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:")
52 timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)")
53
54 def logtail(logcollection, logger, header, maxlen=25):
55     if len(logcollection) == 0:
56         logger.info(header)
57         logger.info("  ** log is empty **")
58         return
59
60     containersapi = ("crunch-run.txt" in logcollection)
61     mergelogs = {}
62
63     for log in logcollection.keys():
64         if not containersapi or log in ("crunch-run.txt", "stdout.txt", "stderr.txt"):
65             logname = log[:-4]
66             logt = deque([], maxlen)
67             mergelogs[logname] = logt
68             with logcollection.open(log) as f:
69                 for l in f:
70                     if containersapi:
71                         g = timestamp_re.match(l)
72                         logt.append((g.group(1), g.group(2)))
73                     elif not crunchstat_re.match(l):
74                         logt.append(l)
75
76     if containersapi:
77         keys = mergelogs.keys()
78         loglines = []
79         while True:
80             earliest = None
81             for k in keys:
82                 if mergelogs[k]:
83                     if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]:
84                         earliest = k
85             if earliest is None:
86                 break
87             ts, msg = mergelogs[earliest].popleft()
88             loglines.append("%s %s %s" % (ts, earliest, msg))
89         loglines = loglines[-maxlen:]
90     else:
91         loglines = mergelogs.values()[0]
92
93     logtxt = "\n  ".join(l.strip() for l in loglines)
94     logger.info(header)
95     logger.info("\n  %s", logtxt)