Merge branch 'master' into 9397-prepopulate-output-directory
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 import subprocess
8
9 from StringIO import StringIO
10
11 from schema_salad.sourceline import SourceLine
12
13 import cwltool.draft2tool
14 from cwltool.draft2tool import CommandLineTool
15 import cwltool.workflow
16 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
17 from cwltool.load_tool import fetch_document
18 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
19 from cwltool.utils import aslist
20 from cwltool.builder import substitute
21 from cwltool.pack import pack
22
23 import arvados.collection
24 import ruamel.yaml as yaml
25
26 from .arvdocker import arv_docker_get_image
27 from .pathmapper import ArvPathMapper
28 from ._version import __version__
29 from . import done
30
31 logger = logging.getLogger('arvados.cwl-runner')
32
33 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
34
35 def trim_listing(obj):
36     """Remove 'listing' field from Directory objects that are keep references.
37
38     When Directory objects represent Keep references, it redundant and
39     potentially very expensive to pass fully enumerated Directory objects
40     between instances of cwl-runner (e.g. a submitting a job, or using the
41     RunInSingleContainer feature), so delete the 'listing' field when it is
42     safe to do so.
43     """
44
45     if obj.get("location", "").startswith("keep:") and "listing" in obj:
46         del obj["listing"]
47     if obj.get("location", "").startswith("_:"):
48         del obj["location"]
49
50 def upload_dependencies(arvrunner, name, document_loader,
51                         workflowobj, uri, loadref_run, include_primary=True):
52     """Upload the dependencies of the workflowobj document to Keep.
53
54     Returns a pathmapper object mapping local paths to keep references.  Also
55     does an in-place update of references in "workflowobj".
56
57     Use scandeps to find $import, $include, $schemas, run, File and Directory
58     fields that represent external references.
59
60     If workflowobj has an "id" field, this will reload the document to ensure
61     it is scanning the raw document prior to preprocessing.
62     """
63
64     loaded = set()
65     def loadref(b, u):
66         joined = document_loader.fetcher.urljoin(b, u)
67         defrg, _ = urlparse.urldefrag(joined)
68         if defrg not in loaded:
69             loaded.add(defrg)
70             # Use fetch_text to get raw file (before preprocessing).
71             text = document_loader.fetch_text(defrg)
72             if isinstance(text, bytes):
73                 textIO = StringIO(text.decode('utf-8'))
74             else:
75                 textIO = StringIO(text)
76             return yaml.safe_load(textIO)
77         else:
78             return {}
79
80     if loadref_run:
81         loadref_fields = set(("$import", "run"))
82     else:
83         loadref_fields = set(("$import",))
84
85     scanobj = workflowobj
86     if "id" in workflowobj:
87         # Need raw file content (before preprocessing) to ensure
88         # that external references in $include and $mixin are captured.
89         scanobj = loadref("", workflowobj["id"])
90
91     sc = scandeps(uri, scanobj,
92                   loadref_fields,
93                   set(("$include", "$schemas", "location")),
94                   loadref, urljoin=document_loader.fetcher.urljoin)
95
96     normalizeFilesDirs(sc)
97
98     if include_primary and "id" in workflowobj:
99         sc.append({"class": "File", "location": workflowobj["id"]})
100
101     if "$schemas" in workflowobj:
102         for s in workflowobj["$schemas"]:
103             sc.append({"class": "File", "location": s})
104
105     mapper = ArvPathMapper(arvrunner, sc, "",
106                            "keep:%s",
107                            "keep:%s/%s",
108                            name=name)
109
110     def setloc(p):
111         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
112             p["location"] = mapper.mapper(p["location"]).resolved
113     adjustFileObjs(workflowobj, setloc)
114     adjustDirObjs(workflowobj, setloc)
115
116     if "$schemas" in workflowobj:
117         sch = []
118         for s in workflowobj["$schemas"]:
119             sch.append(mapper.mapper(s).resolved)
120         workflowobj["$schemas"] = sch
121
122     return mapper
123
124
125 def upload_docker(arvrunner, tool):
126     """Uploads Docker images used in CommandLineTool objects."""
127
128     if isinstance(tool, CommandLineTool):
129         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
130         if docker_req:
131             if docker_req.get("dockerOutputDirectory"):
132                 # TODO: can be supported by containers API, but not jobs API.
133                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
134                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
135             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
136     elif isinstance(tool, cwltool.workflow.Workflow):
137         for s in tool.steps:
138             upload_docker(arvrunner, s.embedded_tool)
139
140 def packed_workflow(arvrunner, tool):
141     """Create a packed workflow.
142
143     A "packed" workflow is one where all the components have been combined into a single document."""
144
145     return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
146                 tool.tool["id"], tool.metadata)
147
148 def tag_git_version(packed):
149     if tool.tool["id"].startswith("file://"):
150         path = os.path.dirname(tool.tool["id"][7:])
151         try:
152             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
153         except (OSError, subprocess.CalledProcessError):
154             pass
155         else:
156             packed["http://schema.org/version"] = githash
157
158
159 def upload_job_order(arvrunner, name, tool, job_order):
160     """Upload local files referenced in the input object and return updated input
161     object with 'location' updated to the proper keep references.
162     """
163
164     for t in tool.tool["inputs"]:
165         def setSecondary(fileobj):
166             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
167                 if "secondaryFiles" not in fileobj:
168                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
169
170             if isinstance(fileobj, list):
171                 for e in fileobj:
172                     setSecondary(e)
173
174         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
175             setSecondary(job_order[shortname(t["id"])])
176
177     jobmapper = upload_dependencies(arvrunner,
178                                     name,
179                                     tool.doc_loader,
180                                     job_order,
181                                     job_order.get("id", "#"),
182                                     False)
183
184     if "id" in job_order:
185         del job_order["id"]
186
187     # Need to filter this out, gets added by cwltool when providing
188     # parameters on the command line.
189     if "job_order" in job_order:
190         del job_order["job_order"]
191
192     return job_order
193
194 def upload_workflow_deps(arvrunner, tool):
195     # Ensure that Docker images needed by this workflow are available
196
197     upload_docker(arvrunner, tool)
198
199     document_loader = tool.doc_loader
200
201     def upload_tool_deps(deptool):
202         if "id" in deptool:
203             upload_dependencies(arvrunner,
204                                 "%s dependencies" % (shortname(deptool["id"])),
205                                 document_loader,
206                                 deptool,
207                                 deptool["id"],
208                                 False,
209                                 include_primary=False)
210             document_loader.idx[deptool["id"]] = deptool
211
212     tool.visit(upload_tool_deps)
213
214 def arvados_jobs_image(arvrunner, img):
215     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
216
217     try:
218         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
219     except Exception as e:
220         raise Exception("Docker image %s is not available\n%s" % (img, e) )
221     return img
222
223 def upload_workflow_collection(arvrunner, name, packed):
224     collection = arvados.collection.Collection(api_client=arvrunner.api,
225                                                keep_client=arvrunner.keep_client,
226                                                num_retries=arvrunner.num_retries)
227     with collection.open("workflow.cwl", "w") as f:
228         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
229
230     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
231                ["name", "like", name+"%"]]
232     if arvrunner.project_uuid:
233         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
234     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
235
236     if exists["items"]:
237         logger.info("Using collection %s", exists["items"][0]["uuid"])
238     else:
239         collection.save_new(name=name,
240                             owner_uuid=arvrunner.project_uuid,
241                             ensure_unique_name=True,
242                             num_retries=arvrunner.num_retries)
243         logger.info("Uploaded to %s", collection.manifest_locator())
244
245     return collection.portable_data_hash()
246
247
248 class Runner(object):
249     """Base class for runner processes, which submit an instance of
250     arvados-cwl-runner and wait for the final result."""
251
252     def __init__(self, runner, tool, job_order, enable_reuse,
253                  output_name, output_tags, submit_runner_ram=0,
254                  name=None, on_error=None, submit_runner_image=None):
255         self.arvrunner = runner
256         self.tool = tool
257         self.job_order = job_order
258         self.running = False
259         self.enable_reuse = enable_reuse
260         self.uuid = None
261         self.final_output = None
262         self.output_name = output_name
263         self.output_tags = output_tags
264         self.name = name
265         self.on_error = on_error
266         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
267
268         if submit_runner_ram:
269             self.submit_runner_ram = submit_runner_ram
270         else:
271             self.submit_runner_ram = 1024
272
273         if self.submit_runner_ram <= 0:
274             raise Exception("Value of --submit-runner-ram must be greater than zero")
275
276     def update_pipeline_component(self, record):
277         pass
278
279     def done(self, record):
280         """Base method for handling a completed runner."""
281
282         try:
283             if record["state"] == "Complete":
284                 if record.get("exit_code") is not None:
285                     if record["exit_code"] == 33:
286                         processStatus = "UnsupportedRequirement"
287                     elif record["exit_code"] == 0:
288                         processStatus = "success"
289                     else:
290                         processStatus = "permanentFail"
291                 else:
292                     processStatus = "success"
293             else:
294                 processStatus = "permanentFail"
295
296             outputs = {}
297
298             if processStatus == "permanentFail":
299                 logc = arvados.collection.CollectionReader(record["log"],
300                                                            api_client=self.arvrunner.api,
301                                                            keep_client=self.arvrunner.keep_client,
302                                                            num_retries=self.arvrunner.num_retries)
303                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
304
305             self.final_output = record["output"]
306             outc = arvados.collection.CollectionReader(self.final_output,
307                                                        api_client=self.arvrunner.api,
308                                                        keep_client=self.arvrunner.keep_client,
309                                                        num_retries=self.arvrunner.num_retries)
310             if "cwl.output.json" in outc:
311                 with outc.open("cwl.output.json") as f:
312                     if f.size() > 0:
313                         outputs = json.load(f)
314             def keepify(fileobj):
315                 path = fileobj["location"]
316                 if not path.startswith("keep:"):
317                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
318             adjustFileObjs(outputs, keepify)
319             adjustDirObjs(outputs, keepify)
320         except Exception as e:
321             logger.exception("[%s] While getting final output object: %s", self.name, e)
322             self.arvrunner.output_callback({}, "permanentFail")
323         else:
324             self.arvrunner.output_callback(outputs, processStatus)
325         finally:
326             if record["uuid"] in self.arvrunner.processes:
327                 del self.arvrunner.processes[record["uuid"]]