10812: Handle $schema references.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 import subprocess
8
9 from StringIO import StringIO
10
11 from schema_salad.sourceline import SourceLine
12
13 import cwltool.draft2tool
14 from cwltool.draft2tool import CommandLineTool
15 import cwltool.workflow
16 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
17 from cwltool.load_tool import fetch_document
18 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
19 from cwltool.utils import aslist
20 from cwltool.builder import substitute
21 from cwltool.pack import pack
22
23 import arvados.collection
24 import ruamel.yaml as yaml
25
26 from .arvdocker import arv_docker_get_image
27 from .pathmapper import ArvPathMapper
28 from ._version import __version__
29 from . import done
30
31 logger = logging.getLogger('arvados.cwl-runner')
32
33 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
34
35 def trim_listing(obj):
36     """Remove 'listing' field from Directory objects that are keep references.
37
38     When Directory objects represent Keep references, it redundant and
39     potentially very expensive to pass fully enumerated Directory objects
40     between instances of cwl-runner (e.g. a submitting a job, or using the
41     RunInSingleContainer feature), so delete the 'listing' field when it is
42     safe to do so.
43     """
44
45     if obj.get("location", "").startswith("keep:") and "listing" in obj:
46         del obj["listing"]
47     if obj.get("location", "").startswith("_:"):
48         del obj["location"]
49
50 def upload_dependencies(arvrunner, name, document_loader,
51                         workflowobj, uri, loadref_run, include_primary=True):
52     """Upload the dependencies of the workflowobj document to Keep.
53
54     Returns a pathmapper object mapping local paths to keep references.  Also
55     does an in-place update of references in "workflowobj".
56
57     Use scandeps to find $import, $include, $schemas, run, File and Directory
58     fields that represent external references.
59
60     If workflowobj has an "id" field, this will reload the document to ensure
61     it is scanning the raw document prior to preprocessing.
62     """
63
64     loaded = set()
65     def loadref(b, u):
66         joined = document_loader.fetcher.urljoin(b, u)
67         defrg, _ = urlparse.urldefrag(joined)
68         if defrg not in loaded:
69             loaded.add(defrg)
70             # Use fetch_text to get raw file (before preprocessing).
71             text = document_loader.fetch_text(defrg)
72             if isinstance(text, bytes):
73                 textIO = StringIO(text.decode('utf-8'))
74             else:
75                 textIO = StringIO(text)
76             return yaml.safe_load(textIO)
77         else:
78             return {}
79
80     if loadref_run:
81         loadref_fields = set(("$import", "run"))
82     else:
83         loadref_fields = set(("$import",))
84
85     scanobj = workflowobj
86     if "id" in workflowobj:
87         # Need raw file content (before preprocessing) to ensure
88         # that external references in $include and $mixin are captured.
89         scanobj = loadref("", workflowobj["id"])
90
91     sc = scandeps(uri, scanobj,
92                   loadref_fields,
93                   set(("$include", "$schemas", "location")),
94                   loadref, urljoin=document_loader.fetcher.urljoin)
95
96     normalizeFilesDirs(sc)
97
98     if include_primary and "id" in workflowobj:
99         sc.append({"class": "File", "location": workflowobj["id"]})
100
101     if "$schemas" in workflowobj:
102         for s in workflowobj["$schemas"]:
103             sc.append({"class": "File", "location": s})
104
105     mapper = ArvPathMapper(arvrunner, sc, "",
106                            "keep:%s",
107                            "keep:%s/%s",
108                            name=name)
109
110     def setloc(p):
111         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
112             p["location"] = mapper.mapper(p["location"]).resolved
113     adjustFileObjs(workflowobj, setloc)
114     adjustDirObjs(workflowobj, setloc)
115
116     if "$schemas" in workflowobj:
117         sch = []
118         for s in workflowobj["$schemas"]:
119             sch.append(mapper.mapper(s).resolved)
120         workflowobj["$schemas"] = sch
121
122     return mapper
123
124
125 def upload_docker(arvrunner, tool):
126     """Visitor which uploads Docker images referenced in CommandLineTool objects."""
127     if isinstance(tool, CommandLineTool):
128         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
129         if docker_req:
130             if docker_req.get("dockerOutputDirectory"):
131                 # TODO: can be supported by containers API, but not jobs API.
132                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
133                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
134             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
135
136 def packed_workflow(arvrunner, tool):
137     """Create a packed workflow.
138
139     A "packed" workflow is one where all the components have been combined into a single document."""
140
141     return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
142                 tool.tool["id"], tool.metadata)
143
144 def tag_git_version(packed):
145     if tool.tool["id"].startswith("file://"):
146         path = os.path.dirname(tool.tool["id"][7:])
147         try:
148             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
149         except (OSError, subprocess.CalledProcessError):
150             pass
151         else:
152             packed["http://schema.org/version"] = githash
153
154
155 def upload_job_order(arvrunner, name, tool, job_order):
156     """Upload local files referenced in the input object and return updated input
157     object with 'location' updated to the proper keep references.
158     """
159
160     for t in tool.tool["inputs"]:
161         def setSecondary(fileobj):
162             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
163                 if "secondaryFiles" not in fileobj:
164                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
165
166             if isinstance(fileobj, list):
167                 for e in fileobj:
168                     setSecondary(e)
169
170         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
171             setSecondary(job_order[shortname(t["id"])])
172
173     jobmapper = upload_dependencies(arvrunner,
174                                     name,
175                                     tool.doc_loader,
176                                     job_order,
177                                     job_order.get("id", "#"),
178                                     False)
179
180     if "id" in job_order:
181         del job_order["id"]
182
183     # Need to filter this out, gets added by cwltool when providing
184     # parameters on the command line.
185     if "job_order" in job_order:
186         del job_order["job_order"]
187
188     return job_order
189
190 def upload_workflow_deps(arvrunner, tool):
191     # Ensure that Docker images needed by this workflow are available
192     tool.visit(partial(upload_docker, arvrunner))
193
194     document_loader = tool.doc_loader
195
196     def upload_tool_deps(deptool):
197         if "id" in deptool:
198             upload_dependencies(arvrunner,
199                                 "%s dependencies" % (shortname(deptool["id"])),
200                                 document_loader,
201                                 deptool,
202                                 deptool["id"],
203                                 False,
204                                 include_primary=False)
205             document_loader.idx[deptool["id"]] = deptool
206
207     tool.visit(upload_tool_deps)
208
209 def arvados_jobs_image(arvrunner, img):
210     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
211
212     try:
213         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
214     except Exception as e:
215         raise Exception("Docker image %s is not available\n%s" % (img, e) )
216     return img
217
218 class Runner(object):
219     """Base class for runner processes, which submit an instance of
220     arvados-cwl-runner and wait for the final result."""
221
222     def __init__(self, runner, tool, job_order, enable_reuse,
223                  output_name, output_tags, submit_runner_ram=0,
224                  name=None, on_error=None, submit_runner_image=None):
225         self.arvrunner = runner
226         self.tool = tool
227         self.job_order = job_order
228         self.running = False
229         self.enable_reuse = enable_reuse
230         self.uuid = None
231         self.final_output = None
232         self.output_name = output_name
233         self.output_tags = output_tags
234         self.name = name
235         self.on_error = on_error
236         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
237
238         if submit_runner_ram:
239             self.submit_runner_ram = submit_runner_ram
240         else:
241             self.submit_runner_ram = 1024
242
243         if self.submit_runner_ram <= 0:
244             raise Exception("Value of --submit-runner-ram must be greater than zero")
245
246     def update_pipeline_component(self, record):
247         pass
248
249     def done(self, record):
250         """Base method for handling a completed runner."""
251
252         try:
253             if record["state"] == "Complete":
254                 if record.get("exit_code") is not None:
255                     if record["exit_code"] == 33:
256                         processStatus = "UnsupportedRequirement"
257                     elif record["exit_code"] == 0:
258                         processStatus = "success"
259                     else:
260                         processStatus = "permanentFail"
261                 else:
262                     processStatus = "success"
263             else:
264                 processStatus = "permanentFail"
265
266             outputs = {}
267
268             if processStatus == "permanentFail":
269                 logc = arvados.collection.CollectionReader(record["log"],
270                                                            api_client=self.arvrunner.api,
271                                                            keep_client=self.arvrunner.keep_client,
272                                                            num_retries=self.arvrunner.num_retries)
273                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
274
275             self.final_output = record["output"]
276             outc = arvados.collection.CollectionReader(self.final_output,
277                                                        api_client=self.arvrunner.api,
278                                                        keep_client=self.arvrunner.keep_client,
279                                                        num_retries=self.arvrunner.num_retries)
280             if "cwl.output.json" in outc:
281                 with outc.open("cwl.output.json") as f:
282                     if f.size() > 0:
283                         outputs = json.load(f)
284             def keepify(fileobj):
285                 path = fileobj["location"]
286                 if not path.startswith("keep:"):
287                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
288             adjustFileObjs(outputs, keepify)
289             adjustDirObjs(outputs, keepify)
290         except Exception as e:
291             logger.exception("[%s] While getting final output object: %s", self.name, e)
292             self.arvrunner.output_callback({}, "permanentFail")
293         else:
294             self.arvrunner.output_callback(outputs, processStatus)
295         finally:
296             if record["uuid"] in self.arvrunner.processes:
297                 del self.arvrunner.processes[record["uuid"]]