6978b5469ab00b1096430667e5e35304d3aeb7a8
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 import subprocess
8
9 from cStringIO import StringIO
10
11 from schema_salad.sourceline import SourceLine
12
13 import cwltool.draft2tool
14 from cwltool.draft2tool import CommandLineTool
15 import cwltool.workflow
16 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
17 from cwltool.load_tool import fetch_document
18 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
19 from cwltool.utils import aslist
20 from cwltool.builder import substitute
21 from cwltool.pack import pack
22
23 import arvados.collection
24 import ruamel.yaml as yaml
25
26 from .arvdocker import arv_docker_get_image
27 from .pathmapper import ArvPathMapper
28 from ._version import __version__
29 from . import done
30
31 logger = logging.getLogger('arvados.cwl-runner')
32
33 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
34
35 def trim_listing(obj):
36     """Remove 'listing' field from Directory objects that are keep references.
37
38     When Directory objects represent Keep references, it redundant and
39     potentially very expensive to pass fully enumerated Directory objects
40     between instances of cwl-runner (e.g. a submitting a job, or using the
41     RunInSingleContainer feature), so delete the 'listing' field when it is
42     safe to do so.
43     """
44
45     if obj.get("location", "").startswith("keep:") and "listing" in obj:
46         del obj["listing"]
47     if obj.get("location", "").startswith("_:"):
48         del obj["location"]
49
50 def upload_dependencies(arvrunner, name, document_loader,
51                         workflowobj, uri, loadref_run, include_primary=True):
52     """Upload the dependencies of the workflowobj document to Keep.
53
54     Returns a pathmapper object mapping local paths to keep references.  Also
55     does an in-place update of references in "workflowobj".
56
57     Use scandeps to find $import, $include, $schemas, run, File and Directory
58     fields that represent external references.
59
60     If workflowobj has an "id" field, this will reload the document to ensure
61     it is scanning the raw document prior to preprocessing.
62     """
63
64     loaded = set()
65     def loadref(b, u):
66         joined = document_loader.fetcher.urljoin(b, u)
67         defrg, _ = urlparse.urldefrag(joined)
68         if defrg not in loaded:
69             loaded.add(defrg)
70             # Use fetch_text to get raw file (before preprocessing).
71             text = document_loader.fetch_text(defrg)
72             if isinstance(text, bytes):
73                 textIO = StringIO(text.decode('utf-8'))
74             else:
75                 textIO = StringIO(text)
76             return yaml.safe_load(textIO)
77         else:
78             return {}
79
80     if loadref_run:
81         loadref_fields = set(("$import", "run"))
82     else:
83         loadref_fields = set(("$import",))
84
85     scanobj = workflowobj
86     if "id" in workflowobj:
87         # Need raw file content (before preprocessing) to ensure
88         # that external references in $include and $mixin are captured.
89         scanobj = loadref("", workflowobj["id"])
90
91     sc = scandeps(uri, scanobj,
92                   loadref_fields,
93                   set(("$include", "$schemas", "location")),
94                   loadref, urljoin=document_loader.fetcher.urljoin)
95
96     normalizeFilesDirs(sc)
97
98     if include_primary and "id" in workflowobj:
99         sc.append({"class": "File", "location": workflowobj["id"]})
100
101     mapper = ArvPathMapper(arvrunner, sc, "",
102                            "keep:%s",
103                            "keep:%s/%s",
104                            name=name)
105
106     def setloc(p):
107         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
108             p["location"] = mapper.mapper(p["location"]).resolved
109     adjustFileObjs(workflowobj, setloc)
110     adjustDirObjs(workflowobj, setloc)
111
112     return mapper
113
114
115 def upload_docker(arvrunner, tool):
116     """Visitor which uploads Docker images referenced in CommandLineTool objects."""
117     if isinstance(tool, CommandLineTool):
118         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
119         if docker_req:
120             if docker_req.get("dockerOutputDirectory"):
121                 # TODO: can be supported by containers API, but not jobs API.
122                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
123                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
124             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
125
126 def packed_workflow(arvrunner, tool):
127     """Create a packed workflow.
128
129     A "packed" workflow is one where all the components have been combined into a single document."""
130
131     return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
132                 tool.tool["id"], tool.metadata)
133
134 def tag_git_version(packed):
135     if tool.tool["id"].startswith("file://"):
136         path = os.path.dirname(tool.tool["id"][7:])
137         try:
138             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
139         except (OSError, subprocess.CalledProcessError):
140             pass
141         else:
142             packed["http://schema.org/version"] = githash
143
144
145 def upload_job_order(arvrunner, name, tool, job_order):
146     """Upload local files referenced in the input object and return updated input
147     object with 'location' updated to the proper keep references.
148     """
149
150     for t in tool.tool["inputs"]:
151         def setSecondary(fileobj):
152             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
153                 if "secondaryFiles" not in fileobj:
154                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
155
156             if isinstance(fileobj, list):
157                 for e in fileobj:
158                     setSecondary(e)
159
160         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
161             setSecondary(job_order[shortname(t["id"])])
162
163     jobmapper = upload_dependencies(arvrunner,
164                                     name,
165                                     tool.doc_loader,
166                                     job_order,
167                                     job_order.get("id", "#"),
168                                     False)
169
170     if "id" in job_order:
171         del job_order["id"]
172
173     # Need to filter this out, gets added by cwltool when providing
174     # parameters on the command line.
175     if "job_order" in job_order:
176         del job_order["job_order"]
177
178     return job_order
179
180 def upload_workflow_deps(arvrunner, tool):
181     # Ensure that Docker images needed by this workflow are available
182     tool.visit(partial(upload_docker, arvrunner))
183
184     document_loader = tool.doc_loader
185
186     def upload_tool_deps(deptool):
187         if "id" in deptool:
188             upload_dependencies(arvrunner,
189                                 "%s dependencies" % (shortname(deptool["id"])),
190                                 document_loader,
191                                 deptool,
192                                 deptool["id"],
193                                 False,
194                                 include_primary=False)
195             document_loader.idx[deptool["id"]] = deptool
196
197     tool.visit(upload_tool_deps)
198
199 def arvados_jobs_image(arvrunner, img):
200     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
201
202     try:
203         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
204     except Exception as e:
205         raise Exception("Docker image %s is not available\n%s" % (img, e) )
206     return img
207
208 class Runner(object):
209     """Base class for runner processes, which submit an instance of
210     arvados-cwl-runner and wait for the final result."""
211
212     def __init__(self, runner, tool, job_order, enable_reuse,
213                  output_name, output_tags, submit_runner_ram=0,
214                  name=None, on_error=None, submit_runner_image=None):
215         self.arvrunner = runner
216         self.tool = tool
217         self.job_order = job_order
218         self.running = False
219         self.enable_reuse = enable_reuse
220         self.uuid = None
221         self.final_output = None
222         self.output_name = output_name
223         self.output_tags = output_tags
224         self.name = name
225         self.on_error = on_error
226         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
227
228         if submit_runner_ram:
229             self.submit_runner_ram = submit_runner_ram
230         else:
231             self.submit_runner_ram = 1024
232
233         if self.submit_runner_ram <= 0:
234             raise Exception("Value of --submit-runner-ram must be greater than zero")
235
236     def update_pipeline_component(self, record):
237         pass
238
239     def done(self, record):
240         """Base method for handling a completed runner."""
241
242         try:
243             if record["state"] == "Complete":
244                 if record.get("exit_code") is not None:
245                     if record["exit_code"] == 33:
246                         processStatus = "UnsupportedRequirement"
247                     elif record["exit_code"] == 0:
248                         processStatus = "success"
249                     else:
250                         processStatus = "permanentFail"
251                 else:
252                     processStatus = "success"
253             else:
254                 processStatus = "permanentFail"
255
256             outputs = {}
257
258             if processStatus == "permanentFail":
259                 logc = arvados.collection.CollectionReader(record["log"],
260                                                            api_client=self.arvrunner.api,
261                                                            keep_client=self.arvrunner.keep_client,
262                                                            num_retries=self.arvrunner.num_retries)
263                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
264
265             self.final_output = record["output"]
266             outc = arvados.collection.CollectionReader(self.final_output,
267                                                        api_client=self.arvrunner.api,
268                                                        keep_client=self.arvrunner.keep_client,
269                                                        num_retries=self.arvrunner.num_retries)
270             if "cwl.output.json" in outc:
271                 with outc.open("cwl.output.json") as f:
272                     if f.size() > 0:
273                         outputs = json.load(f)
274             def keepify(fileobj):
275                 path = fileobj["location"]
276                 if not path.startswith("keep:"):
277                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
278             adjustFileObjs(outputs, keepify)
279             adjustDirObjs(outputs, keepify)
280         except Exception as e:
281             logger.exception("[%s] While getting final output object: %s", self.name, e)
282             self.arvrunner.output_callback({}, "permanentFail")
283         else:
284             self.arvrunner.output_callback(outputs, processStatus)
285         finally:
286             if record["uuid"] in self.arvrunner.processes:
287                 del self.arvrunner.processes[record["uuid"]]