dea47567a767b34dd0252781a1d1031b9990deec
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
15 from cwltool.utils import aslist
16 from cwltool.builder import substitute
17
18 import arvados.collection
19 import ruamel.yaml as yaml
20
21 from .arvdocker import arv_docker_get_image
22 from .pathmapper import ArvPathMapper
23 from ._version import __version__
24
25 logger = logging.getLogger('arvados.cwl-runner')
26
27 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
28
29 def trim_listing(obj):
30     """Remove 'listing' field from Directory objects that are keep references.
31
32     When Directory objects represent Keep references, it redundant and
33     potentially very expensive to pass fully enumerated Directory objects
34     between instances of cwl-runner (e.g. a submitting a job, or using the
35     RunInSingleContainer feature), so delete the 'listing' field when it is
36     safe to do so.
37     """
38
39     if obj.get("location", "").startswith("keep:") and "listing" in obj:
40         del obj["listing"]
41     if obj.get("location", "").startswith("_:"):
42         del obj["location"]
43
44 def upload_dependencies(arvrunner, name, document_loader,
45                         workflowobj, uri, loadref_run):
46     """Upload the dependencies of the workflowobj document to Keep.
47
48     Returns a pathmapper object mapping local paths to keep references.  Also
49     does an in-place update of references in "workflowobj".
50
51     Use scandeps to find $import, $include, $schemas, run, File and Directory
52     fields that represent external references.
53
54     If workflowobj has an "id" field, this will reload the document to ensure
55     it is scanning the raw document prior to preprocessing.
56     """
57
58     loaded = set()
59     def loadref(b, u):
60         joined = document_loader.fetcher.urljoin(b, u)
61         defrg, _ = urlparse.urldefrag(joined)
62         if defrg not in loaded:
63             loaded.add(defrg)
64             # Use fetch_text to get raw file (before preprocessing).
65             text = document_loader.fetch_text(defrg)
66             if isinstance(text, bytes):
67                 textIO = StringIO(text.decode('utf-8'))
68             else:
69                 textIO = StringIO(text)
70             return yaml.safe_load(textIO)
71         else:
72             return {}
73
74     if loadref_run:
75         loadref_fields = set(("$import", "run"))
76     else:
77         loadref_fields = set(("$import",))
78
79     scanobj = workflowobj
80     if "id" in workflowobj:
81         # Need raw file content (before preprocessing) to ensure
82         # that external references in $include and $mixin are captured.
83         scanobj = loadref("", workflowobj["id"])
84
85     sc = scandeps(uri, scanobj,
86                   loadref_fields,
87                   set(("$include", "$schemas", "location")),
88                   loadref, urljoin=document_loader.fetcher.urljoin)
89
90     normalizeFilesDirs(sc)
91
92     if "id" in workflowobj:
93         sc.append({"class": "File", "location": workflowobj["id"]})
94
95     mapper = ArvPathMapper(arvrunner, sc, "",
96                            "keep:%s",
97                            "keep:%s/%s",
98                            name=name)
99
100     def setloc(p):
101         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
102             p["location"] = mapper.mapper(p["location"]).resolved
103     adjustFileObjs(workflowobj, setloc)
104     adjustDirObjs(workflowobj, setloc)
105
106     return mapper
107
108
109 def upload_docker(arvrunner, tool):
110     if isinstance(tool, CommandLineTool):
111         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
112         if docker_req:
113             if docker_req.get("dockerOutputDirectory"):
114                 # TODO: can be supported by containers API, but not jobs API.
115                 raise UnsupportedRequirement("Option 'dockerOutputDirectory' of DockerRequirement not supported.")
116             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
117     elif isinstance(tool, cwltool.workflow.Workflow):
118         for s in tool.steps:
119             upload_docker(arvrunner, s.embedded_tool)
120
121 def upload_instance(arvrunner, name, tool, job_order):
122         upload_docker(arvrunner, tool)
123
124         for t in tool.tool["inputs"]:
125             def setSecondary(fileobj):
126                 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
127                     if "secondaryFiles" not in fileobj:
128                         fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
129
130                 if isinstance(fileobj, list):
131                     for e in fileobj:
132                         setSecondary(e)
133
134             if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
135                 setSecondary(job_order[shortname(t["id"])])
136
137         workflowmapper = upload_dependencies(arvrunner,
138                                              name,
139                                              tool.doc_loader,
140                                              tool.tool,
141                                              tool.tool["id"],
142                                              True)
143         jobmapper = upload_dependencies(arvrunner,
144                                         os.path.basename(job_order.get("id", "#")),
145                                         tool.doc_loader,
146                                         job_order,
147                                         job_order.get("id", "#"),
148                                         False)
149
150         if "id" in job_order:
151             del job_order["id"]
152
153         return workflowmapper
154
155 def arvados_jobs_image(arvrunner):
156     img = "arvados/jobs:"+__version__
157     try:
158         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
159     except Exception as e:
160         raise Exception("Docker image %s is not available\n%s" % (img, e) )
161     return img
162
163 class Runner(object):
164     def __init__(self, runner, tool, job_order, enable_reuse,
165                  output_name, output_tags, submit_runner_ram=0,
166                  name=None):
167         self.arvrunner = runner
168         self.tool = tool
169         self.job_order = job_order
170         self.running = False
171         self.enable_reuse = enable_reuse
172         self.uuid = None
173         self.final_output = None
174         self.output_name = output_name
175         self.output_tags = output_tags
176         self.name = name
177
178         if submit_runner_ram:
179             self.submit_runner_ram = submit_runner_ram
180         else:
181             self.submit_runner_ram = 1024
182
183         if self.submit_runner_ram <= 0:
184             raise Exception("Value of --submit-runner-ram must be greater than zero")
185
186     def update_pipeline_component(self, record):
187         pass
188
189     def arvados_job_spec(self, *args, **kwargs):
190         if self.name is None:
191             self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
192
193         # Need to filter this out, gets added by cwltool when providing
194         # parameters on the command line.
195         if "job_order" in self.job_order:
196             del self.job_order["job_order"]
197
198         workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
199         adjustDirObjs(self.job_order, trim_listing)
200         return workflowmapper
201
202     def done(self, record):
203         if record["state"] == "Complete":
204             if record.get("exit_code") is not None:
205                 if record["exit_code"] == 33:
206                     processStatus = "UnsupportedRequirement"
207                 elif record["exit_code"] == 0:
208                     processStatus = "success"
209                 else:
210                     processStatus = "permanentFail"
211             else:
212                 processStatus = "success"
213         else:
214             processStatus = "permanentFail"
215
216         outputs = {}
217         try:
218             try:
219                 self.final_output = record["output"]
220                 outc = arvados.collection.CollectionReader(self.final_output,
221                                                            api_client=self.arvrunner.api,
222                                                            keep_client=self.arvrunner.keep_client,
223                                                            num_retries=self.arvrunner.num_retries)
224                 if "cwl.output.json" in outc:
225                     with outc.open("cwl.output.json") as f:
226                         if f.size() > 0:
227                             outputs = json.load(f)
228                 def keepify(fileobj):
229                     path = fileobj["location"]
230                     if not path.startswith("keep:"):
231                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
232                 adjustFileObjs(outputs, keepify)
233                 adjustDirObjs(outputs, keepify)
234             except Exception as e:
235                 logger.exception("While getting final output object: %s", e)
236             self.arvrunner.output_callback(outputs, processStatus)
237         finally:
238             if record["uuid"] in self.arvrunner.processes:
239                 del self.arvrunner.processes[record["uuid"]]