Merge branch '10497-cwl-better-errors' refs #10497
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 from schema_salad.sourceline import SourceLine
10
11 import cwltool.draft2tool
12 from cwltool.draft2tool import CommandLineTool
13 import cwltool.workflow
14 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
15 from cwltool.load_tool import fetch_document
16 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
17 from cwltool.utils import aslist
18 from cwltool.builder import substitute
19
20 import arvados.collection
21 import ruamel.yaml as yaml
22
23 from .arvdocker import arv_docker_get_image
24 from .pathmapper import ArvPathMapper
25 from ._version import __version__
26
27 logger = logging.getLogger('arvados.cwl-runner')
28
29 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
30
31 def trim_listing(obj):
32     """Remove 'listing' field from Directory objects that are keep references.
33
34     When Directory objects represent Keep references, it redundant and
35     potentially very expensive to pass fully enumerated Directory objects
36     between instances of cwl-runner (e.g. a submitting a job, or using the
37     RunInSingleContainer feature), so delete the 'listing' field when it is
38     safe to do so.
39     """
40
41     if obj.get("location", "").startswith("keep:") and "listing" in obj:
42         del obj["listing"]
43     if obj.get("location", "").startswith("_:"):
44         del obj["location"]
45
46 def upload_dependencies(arvrunner, name, document_loader,
47                         workflowobj, uri, loadref_run):
48     """Upload the dependencies of the workflowobj document to Keep.
49
50     Returns a pathmapper object mapping local paths to keep references.  Also
51     does an in-place update of references in "workflowobj".
52
53     Use scandeps to find $import, $include, $schemas, run, File and Directory
54     fields that represent external references.
55
56     If workflowobj has an "id" field, this will reload the document to ensure
57     it is scanning the raw document prior to preprocessing.
58     """
59
60     loaded = set()
61     def loadref(b, u):
62         joined = document_loader.fetcher.urljoin(b, u)
63         defrg, _ = urlparse.urldefrag(joined)
64         if defrg not in loaded:
65             loaded.add(defrg)
66             # Use fetch_text to get raw file (before preprocessing).
67             text = document_loader.fetch_text(defrg)
68             if isinstance(text, bytes):
69                 textIO = StringIO(text.decode('utf-8'))
70             else:
71                 textIO = StringIO(text)
72             return yaml.safe_load(textIO)
73         else:
74             return {}
75
76     if loadref_run:
77         loadref_fields = set(("$import", "run"))
78     else:
79         loadref_fields = set(("$import",))
80
81     scanobj = workflowobj
82     if "id" in workflowobj:
83         # Need raw file content (before preprocessing) to ensure
84         # that external references in $include and $mixin are captured.
85         scanobj = loadref("", workflowobj["id"])
86
87     sc = scandeps(uri, scanobj,
88                   loadref_fields,
89                   set(("$include", "$schemas", "location")),
90                   loadref, urljoin=document_loader.fetcher.urljoin)
91
92     normalizeFilesDirs(sc)
93
94     if "id" in workflowobj:
95         sc.append({"class": "File", "location": workflowobj["id"]})
96
97     mapper = ArvPathMapper(arvrunner, sc, "",
98                            "keep:%s",
99                            "keep:%s/%s",
100                            name=name)
101
102     def setloc(p):
103         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
104             p["location"] = mapper.mapper(p["location"]).resolved
105     adjustFileObjs(workflowobj, setloc)
106     adjustDirObjs(workflowobj, setloc)
107
108     return mapper
109
110
111 def upload_docker(arvrunner, tool):
112     if isinstance(tool, CommandLineTool):
113         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
114         if docker_req:
115             if docker_req.get("dockerOutputDirectory"):
116                 # TODO: can be supported by containers API, but not jobs API.
117                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
118                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
119             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
120     elif isinstance(tool, cwltool.workflow.Workflow):
121         for s in tool.steps:
122             upload_docker(arvrunner, s.embedded_tool)
123
124 def upload_instance(arvrunner, name, tool, job_order):
125         upload_docker(arvrunner, tool)
126
127         for t in tool.tool["inputs"]:
128             def setSecondary(fileobj):
129                 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
130                     if "secondaryFiles" not in fileobj:
131                         fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
132
133                 if isinstance(fileobj, list):
134                     for e in fileobj:
135                         setSecondary(e)
136
137             if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
138                 setSecondary(job_order[shortname(t["id"])])
139
140         workflowmapper = upload_dependencies(arvrunner,
141                                              name,
142                                              tool.doc_loader,
143                                              tool.tool,
144                                              tool.tool["id"],
145                                              True)
146         jobmapper = upload_dependencies(arvrunner,
147                                         os.path.basename(job_order.get("id", "#")),
148                                         tool.doc_loader,
149                                         job_order,
150                                         job_order.get("id", "#"),
151                                         False)
152
153         if "id" in job_order:
154             del job_order["id"]
155
156         return workflowmapper
157
158 def arvados_jobs_image(arvrunner):
159     img = "arvados/jobs:"+__version__
160     try:
161         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
162     except Exception as e:
163         raise Exception("Docker image %s is not available\n%s" % (img, e) )
164     return img
165
166 class Runner(object):
167     def __init__(self, runner, tool, job_order, enable_reuse,
168                  output_name, output_tags, submit_runner_ram=0,
169                  name=None):
170         self.arvrunner = runner
171         self.tool = tool
172         self.job_order = job_order
173         self.running = False
174         self.enable_reuse = enable_reuse
175         self.uuid = None
176         self.final_output = None
177         self.output_name = output_name
178         self.output_tags = output_tags
179         self.name = name
180
181         if submit_runner_ram:
182             self.submit_runner_ram = submit_runner_ram
183         else:
184             self.submit_runner_ram = 1024
185
186         if self.submit_runner_ram <= 0:
187             raise Exception("Value of --submit-runner-ram must be greater than zero")
188
189     def update_pipeline_component(self, record):
190         pass
191
192     def arvados_job_spec(self, *args, **kwargs):
193         if self.name is None:
194             self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
195
196         # Need to filter this out, gets added by cwltool when providing
197         # parameters on the command line.
198         if "job_order" in self.job_order:
199             del self.job_order["job_order"]
200
201         workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
202         adjustDirObjs(self.job_order, trim_listing)
203         return workflowmapper
204
205     def done(self, record):
206         if record["state"] == "Complete":
207             if record.get("exit_code") is not None:
208                 if record["exit_code"] == 33:
209                     processStatus = "UnsupportedRequirement"
210                 elif record["exit_code"] == 0:
211                     processStatus = "success"
212                 else:
213                     processStatus = "permanentFail"
214             else:
215                 processStatus = "success"
216         else:
217             processStatus = "permanentFail"
218
219         outputs = {}
220         try:
221             try:
222                 self.final_output = record["output"]
223                 outc = arvados.collection.CollectionReader(self.final_output,
224                                                            api_client=self.arvrunner.api,
225                                                            keep_client=self.arvrunner.keep_client,
226                                                            num_retries=self.arvrunner.num_retries)
227                 if "cwl.output.json" in outc:
228                     with outc.open("cwl.output.json") as f:
229                         if f.size() > 0:
230                             outputs = json.load(f)
231                 def keepify(fileobj):
232                     path = fileobj["location"]
233                     if not path.startswith("keep:"):
234                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
235                 adjustFileObjs(outputs, keepify)
236                 adjustDirObjs(outputs, keepify)
237             except Exception as e:
238                 logger.exception("While getting final output object: %s", e)
239             self.arvrunner.output_callback(outputs, processStatus)
240         finally:
241             if record["uuid"] in self.arvrunner.processes:
242                 del self.arvrunner.processes[record["uuid"]]