Merge branch '9307-cwl-use-tmp-output' closes #9307 closes #9308
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15
16 import arvados.collection
17 import ruamel.yaml as yaml
18
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
21
22 logger = logging.getLogger('arvados.cwl-runner')
23
24 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
25
26 def trim_listing(obj):
27     """Remove 'listing' field from Directory objects that are keep references.
28
29     When Directory objects represent Keep references, it redundant and
30     potentially very expensive to pass fully enumerated Directory objects
31     between instances of cwl-runner (e.g. a submitting a job, or using the
32     RunInSingleContainer feature), so delete the 'listing' field when it is
33     safe to do so.
34     """
35
36     if obj.get("location", "").startswith("keep:") and "listing" in obj:
37         del obj["listing"]
38     if obj.get("location", "").startswith("_:"):
39         del obj["location"]
40
41 def upload_dependencies(arvrunner, name, document_loader,
42                         workflowobj, uri, loadref_run):
43     """Upload the dependencies of the workflowobj document to Keep.
44
45     Returns a pathmapper object mapping local paths to keep references.  Also
46     does an in-place update of references in "workflowobj".
47
48     Use scandeps to find $import, $include, $schemas, run, File and Directory
49     fields that represent external references.
50
51     If workflowobj has an "id" field, this will reload the document to ensure
52     it is scanning the raw document prior to preprocessing.
53     """
54
55     loaded = set()
56     def loadref(b, u):
57         joined = urlparse.urljoin(b, u)
58         defrg, _ = urlparse.urldefrag(joined)
59         if defrg not in loaded:
60             loaded.add(defrg)
61             # Use fetch_text to get raw file (before preprocessing).
62             text = document_loader.fetch_text(defrg)
63             if isinstance(text, bytes):
64                 textIO = StringIO(text.decode('utf-8'))
65             else:
66                 textIO = StringIO(text)
67             return yaml.safe_load(textIO)
68         else:
69             return {}
70
71     if loadref_run:
72         loadref_fields = set(("$import", "run"))
73     else:
74         loadref_fields = set(("$import",))
75
76     scanobj = workflowobj
77     if "id" in workflowobj:
78         # Need raw file content (before preprocessing) to ensure
79         # that external references in $include and $mixin are captured.
80         scanobj = loadref("", workflowobj["id"])
81
82     sc = scandeps(uri, scanobj,
83                   loadref_fields,
84                   set(("$include", "$schemas", "location")),
85                   loadref)
86
87     normalizeFilesDirs(sc)
88
89     if "id" in workflowobj:
90         sc.append({"class": "File", "location": workflowobj["id"]})
91
92     mapper = ArvPathMapper(arvrunner, sc, "",
93                            "keep:%s",
94                            "keep:%s/%s",
95                            name=name)
96
97     def setloc(p):
98         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
99             p["location"] = mapper.mapper(p["location"]).resolved
100     adjustFileObjs(workflowobj, setloc)
101     adjustDirObjs(workflowobj, setloc)
102
103     return mapper
104
105
106 def upload_docker(arvrunner, tool):
107     if isinstance(tool, CommandLineTool):
108         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
109         if docker_req:
110             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
111     elif isinstance(tool, cwltool.workflow.Workflow):
112         for s in tool.steps:
113             upload_docker(arvrunner, s.embedded_tool)
114
115
116 class Runner(object):
117     def __init__(self, runner, tool, job_order, enable_reuse, output_name):
118         self.arvrunner = runner
119         self.tool = tool
120         self.job_order = job_order
121         self.running = False
122         self.enable_reuse = enable_reuse
123         self.uuid = None
124         self.final_output = None
125         self.output_name = output_name
126
127     def update_pipeline_component(self, record):
128         pass
129
130     def arvados_job_spec(self, *args, **kwargs):
131         upload_docker(self.arvrunner, self.tool)
132
133         self.name = os.path.basename(self.tool.tool["id"])
134
135         workflowmapper = upload_dependencies(self.arvrunner,
136                                              self.name,
137                                              self.tool.doc_loader,
138                                              self.tool.tool,
139                                              self.tool.tool["id"],
140                                              True)
141
142         jobmapper = upload_dependencies(self.arvrunner,
143                                         os.path.basename(self.job_order.get("id", "#")),
144                                         self.tool.doc_loader,
145                                         self.job_order,
146                                         self.job_order.get("id", "#"),
147                                         False)
148
149         adjustDirObjs(self.job_order, trim_listing)
150
151         if "id" in self.job_order:
152             del self.job_order["id"]
153
154         return workflowmapper
155
156
157     def done(self, record):
158         if record["state"] == "Complete":
159             if record.get("exit_code") is not None:
160                 if record["exit_code"] == 33:
161                     processStatus = "UnsupportedRequirement"
162                 elif record["exit_code"] == 0:
163                     processStatus = "success"
164                 else:
165                     processStatus = "permanentFail"
166             else:
167                 processStatus = "success"
168         else:
169             processStatus = "permanentFail"
170
171         outputs = None
172         try:
173             try:
174                 self.final_output = record["output"]
175                 outc = arvados.collection.CollectionReader(self.final_output,
176                                                            api_client=self.arvrunner.api,
177                                                            keep_client=self.arvrunner.keep_client,
178                                                            num_retries=self.arvrunner.num_retries)
179                 with outc.open("cwl.output.json") as f:
180                     outputs = json.load(f)
181                 def keepify(fileobj):
182                     path = fileobj["location"]
183                     if not path.startswith("keep:"):
184                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
185                 adjustFileObjs(outputs, keepify)
186                 adjustDirObjs(outputs, keepify)
187             except Exception as e:
188                 logger.error("While getting final output object: %s", e)
189             self.arvrunner.output_callback(outputs, processStatus)
190         finally:
191             del self.arvrunner.processes[record["uuid"]]