Merge branch '10793-cwl-continue-on-error' closes #10793
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 from schema_salad.sourceline import SourceLine
10
11 import cwltool.draft2tool
12 from cwltool.draft2tool import CommandLineTool
13 import cwltool.workflow
14 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
15 from cwltool.load_tool import fetch_document
16 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
17 from cwltool.utils import aslist
18 from cwltool.builder import substitute
19
20 import arvados.collection
21 import ruamel.yaml as yaml
22
23 from .arvdocker import arv_docker_get_image
24 from .pathmapper import ArvPathMapper
25 from ._version import __version__
26 from . import done
27
28 logger = logging.getLogger('arvados.cwl-runner')
29
30 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
31
32 def trim_listing(obj):
33     """Remove 'listing' field from Directory objects that are keep references.
34
35     When Directory objects represent Keep references, it redundant and
36     potentially very expensive to pass fully enumerated Directory objects
37     between instances of cwl-runner (e.g. a submitting a job, or using the
38     RunInSingleContainer feature), so delete the 'listing' field when it is
39     safe to do so.
40     """
41
42     if obj.get("location", "").startswith("keep:") and "listing" in obj:
43         del obj["listing"]
44     if obj.get("location", "").startswith("_:"):
45         del obj["location"]
46
47 def upload_dependencies(arvrunner, name, document_loader,
48                         workflowobj, uri, loadref_run):
49     """Upload the dependencies of the workflowobj document to Keep.
50
51     Returns a pathmapper object mapping local paths to keep references.  Also
52     does an in-place update of references in "workflowobj".
53
54     Use scandeps to find $import, $include, $schemas, run, File and Directory
55     fields that represent external references.
56
57     If workflowobj has an "id" field, this will reload the document to ensure
58     it is scanning the raw document prior to preprocessing.
59     """
60
61     loaded = set()
62     def loadref(b, u):
63         joined = document_loader.fetcher.urljoin(b, u)
64         defrg, _ = urlparse.urldefrag(joined)
65         if defrg not in loaded:
66             loaded.add(defrg)
67             # Use fetch_text to get raw file (before preprocessing).
68             text = document_loader.fetch_text(defrg)
69             if isinstance(text, bytes):
70                 textIO = StringIO(text.decode('utf-8'))
71             else:
72                 textIO = StringIO(text)
73             return yaml.safe_load(textIO)
74         else:
75             return {}
76
77     if loadref_run:
78         loadref_fields = set(("$import", "run"))
79     else:
80         loadref_fields = set(("$import",))
81
82     scanobj = workflowobj
83     if "id" in workflowobj:
84         # Need raw file content (before preprocessing) to ensure
85         # that external references in $include and $mixin are captured.
86         scanobj = loadref("", workflowobj["id"])
87
88     sc = scandeps(uri, scanobj,
89                   loadref_fields,
90                   set(("$include", "$schemas", "location")),
91                   loadref, urljoin=document_loader.fetcher.urljoin)
92
93     normalizeFilesDirs(sc)
94
95     if "id" in workflowobj:
96         sc.append({"class": "File", "location": workflowobj["id"]})
97
98     mapper = ArvPathMapper(arvrunner, sc, "",
99                            "keep:%s",
100                            "keep:%s/%s",
101                            name=name)
102
103     def setloc(p):
104         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
105             p["location"] = mapper.mapper(p["location"]).resolved
106     adjustFileObjs(workflowobj, setloc)
107     adjustDirObjs(workflowobj, setloc)
108
109     return mapper
110
111
112 def upload_docker(arvrunner, tool):
113     if isinstance(tool, CommandLineTool):
114         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
115         if docker_req:
116             if docker_req.get("dockerOutputDirectory"):
117                 # TODO: can be supported by containers API, but not jobs API.
118                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
119                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
120             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
121     elif isinstance(tool, cwltool.workflow.Workflow):
122         for s in tool.steps:
123             upload_docker(arvrunner, s.embedded_tool)
124
125 def upload_instance(arvrunner, name, tool, job_order):
126         upload_docker(arvrunner, tool)
127
128         for t in tool.tool["inputs"]:
129             def setSecondary(fileobj):
130                 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
131                     if "secondaryFiles" not in fileobj:
132                         fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
133
134                 if isinstance(fileobj, list):
135                     for e in fileobj:
136                         setSecondary(e)
137
138             if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
139                 setSecondary(job_order[shortname(t["id"])])
140
141         workflowmapper = upload_dependencies(arvrunner,
142                                              name,
143                                              tool.doc_loader,
144                                              tool.tool,
145                                              tool.tool["id"],
146                                              True)
147         jobmapper = upload_dependencies(arvrunner,
148                                         os.path.basename(job_order.get("id", "#")),
149                                         tool.doc_loader,
150                                         job_order,
151                                         job_order.get("id", "#"),
152                                         False)
153
154         if "id" in job_order:
155             del job_order["id"]
156
157         return workflowmapper
158
159 def arvados_jobs_image(arvrunner):
160     img = "arvados/jobs:"+__version__
161     try:
162         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
163     except Exception as e:
164         raise Exception("Docker image %s is not available\n%s" % (img, e) )
165     return img
166
167 class Runner(object):
168     def __init__(self, runner, tool, job_order, enable_reuse,
169                  output_name, output_tags, submit_runner_ram=0,
170                  name=None, on_error=None):
171         self.arvrunner = runner
172         self.tool = tool
173         self.job_order = job_order
174         self.running = False
175         self.enable_reuse = enable_reuse
176         self.uuid = None
177         self.final_output = None
178         self.output_name = output_name
179         self.output_tags = output_tags
180         self.name = name
181         self.on_error = on_error
182
183         if submit_runner_ram:
184             self.submit_runner_ram = submit_runner_ram
185         else:
186             self.submit_runner_ram = 1024
187
188         if self.submit_runner_ram <= 0:
189             raise Exception("Value of --submit-runner-ram must be greater than zero")
190
191     def update_pipeline_component(self, record):
192         pass
193
194     def arvados_job_spec(self, *args, **kwargs):
195         if self.name is None:
196             self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
197
198         # Need to filter this out, gets added by cwltool when providing
199         # parameters on the command line.
200         if "job_order" in self.job_order:
201             del self.job_order["job_order"]
202
203         workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
204         adjustDirObjs(self.job_order, trim_listing)
205         return workflowmapper
206
207     def done(self, record):
208         try:
209             if record["state"] == "Complete":
210                 if record.get("exit_code") is not None:
211                     if record["exit_code"] == 33:
212                         processStatus = "UnsupportedRequirement"
213                     elif record["exit_code"] == 0:
214                         processStatus = "success"
215                     else:
216                         processStatus = "permanentFail"
217                 else:
218                     processStatus = "success"
219             else:
220                 processStatus = "permanentFail"
221
222             outputs = {}
223
224             if processStatus == "permanentFail":
225                 logc = arvados.collection.CollectionReader(record["log"],
226                                                            api_client=self.arvrunner.api,
227                                                            keep_client=self.arvrunner.keep_client,
228                                                            num_retries=self.arvrunner.num_retries)
229                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
230
231             self.final_output = record["output"]
232             outc = arvados.collection.CollectionReader(self.final_output,
233                                                        api_client=self.arvrunner.api,
234                                                        keep_client=self.arvrunner.keep_client,
235                                                        num_retries=self.arvrunner.num_retries)
236             if "cwl.output.json" in outc:
237                 with outc.open("cwl.output.json") as f:
238                     if f.size() > 0:
239                         outputs = json.load(f)
240             def keepify(fileobj):
241                 path = fileobj["location"]
242                 if not path.startswith("keep:"):
243                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
244             adjustFileObjs(outputs, keepify)
245             adjustDirObjs(outputs, keepify)
246         except Exception as e:
247             logger.exception("[%s] While getting final output object: %s", self.name, e)
248             self.arvrunner.output_callback({}, "permanentFail")
249         else:
250             self.arvrunner.output_callback(outputs, processStatus)
251         finally:
252             if record["uuid"] in self.arvrunner.processes:
253                 del self.arvrunner.processes[record["uuid"]]