Merge branch '10812-submit-runner-image' refs #10812
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7 from cStringIO import StringIO
8
9 from schema_salad.sourceline import SourceLine
10
11 import cwltool.draft2tool
12 from cwltool.draft2tool import CommandLineTool
13 import cwltool.workflow
14 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
15 from cwltool.load_tool import fetch_document
16 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
17 from cwltool.utils import aslist
18 from cwltool.builder import substitute
19
20 import arvados.collection
21 import ruamel.yaml as yaml
22
23 from .arvdocker import arv_docker_get_image
24 from .pathmapper import ArvPathMapper
25 from ._version import __version__
26 from . import done
27
28 logger = logging.getLogger('arvados.cwl-runner')
29
30 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
31
32 def trim_listing(obj):
33     """Remove 'listing' field from Directory objects that are keep references.
34
35     When Directory objects represent Keep references, it redundant and
36     potentially very expensive to pass fully enumerated Directory objects
37     between instances of cwl-runner (e.g. a submitting a job, or using the
38     RunInSingleContainer feature), so delete the 'listing' field when it is
39     safe to do so.
40     """
41
42     if obj.get("location", "").startswith("keep:") and "listing" in obj:
43         del obj["listing"]
44     if obj.get("location", "").startswith("_:"):
45         del obj["location"]
46
47 def upload_dependencies(arvrunner, name, document_loader,
48                         workflowobj, uri, loadref_run):
49     """Upload the dependencies of the workflowobj document to Keep.
50
51     Returns a pathmapper object mapping local paths to keep references.  Also
52     does an in-place update of references in "workflowobj".
53
54     Use scandeps to find $import, $include, $schemas, run, File and Directory
55     fields that represent external references.
56
57     If workflowobj has an "id" field, this will reload the document to ensure
58     it is scanning the raw document prior to preprocessing.
59     """
60
61     loaded = set()
62     def loadref(b, u):
63         joined = document_loader.fetcher.urljoin(b, u)
64         defrg, _ = urlparse.urldefrag(joined)
65         if defrg not in loaded:
66             loaded.add(defrg)
67             # Use fetch_text to get raw file (before preprocessing).
68             text = document_loader.fetch_text(defrg)
69             if isinstance(text, bytes):
70                 textIO = StringIO(text.decode('utf-8'))
71             else:
72                 textIO = StringIO(text)
73             return yaml.safe_load(textIO)
74         else:
75             return {}
76
77     if loadref_run:
78         loadref_fields = set(("$import", "run"))
79     else:
80         loadref_fields = set(("$import",))
81
82     scanobj = workflowobj
83     if "id" in workflowobj:
84         # Need raw file content (before preprocessing) to ensure
85         # that external references in $include and $mixin are captured.
86         scanobj = loadref("", workflowobj["id"])
87
88     sc = scandeps(uri, scanobj,
89                   loadref_fields,
90                   set(("$include", "$schemas", "location")),
91                   loadref, urljoin=document_loader.fetcher.urljoin)
92
93     normalizeFilesDirs(sc)
94
95     if "id" in workflowobj:
96         sc.append({"class": "File", "location": workflowobj["id"]})
97
98     mapper = ArvPathMapper(arvrunner, sc, "",
99                            "keep:%s",
100                            "keep:%s/%s",
101                            name=name)
102
103     def setloc(p):
104         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
105             p["location"] = mapper.mapper(p["location"]).resolved
106     adjustFileObjs(workflowobj, setloc)
107     adjustDirObjs(workflowobj, setloc)
108
109     return mapper
110
111
112 def upload_docker(arvrunner, tool):
113     if isinstance(tool, CommandLineTool):
114         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
115         if docker_req:
116             if docker_req.get("dockerOutputDirectory"):
117                 # TODO: can be supported by containers API, but not jobs API.
118                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
119                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
120             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
121     elif isinstance(tool, cwltool.workflow.Workflow):
122         for s in tool.steps:
123             upload_docker(arvrunner, s.embedded_tool)
124
125 def upload_instance(arvrunner, name, tool, job_order):
126         upload_docker(arvrunner, tool)
127
128         for t in tool.tool["inputs"]:
129             def setSecondary(fileobj):
130                 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
131                     if "secondaryFiles" not in fileobj:
132                         fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
133
134                 if isinstance(fileobj, list):
135                     for e in fileobj:
136                         setSecondary(e)
137
138             if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
139                 setSecondary(job_order[shortname(t["id"])])
140
141         workflowmapper = upload_dependencies(arvrunner,
142                                              name,
143                                              tool.doc_loader,
144                                              tool.tool,
145                                              tool.tool["id"],
146                                              True)
147         jobmapper = upload_dependencies(arvrunner,
148                                         os.path.basename(job_order.get("id", "#")),
149                                         tool.doc_loader,
150                                         job_order,
151                                         job_order.get("id", "#"),
152                                         False)
153
154         if "id" in job_order:
155             del job_order["id"]
156
157         return workflowmapper
158
159 def arvados_jobs_image(arvrunner, img):
160     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
161
162     try:
163         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
164     except Exception as e:
165         raise Exception("Docker image %s is not available\n%s" % (img, e) )
166     return img
167
168 class Runner(object):
169     def __init__(self, runner, tool, job_order, enable_reuse,
170                  output_name, output_tags, submit_runner_ram=0,
171                  name=None, on_error=None, submit_runner_image=None):
172         self.arvrunner = runner
173         self.tool = tool
174         self.job_order = job_order
175         self.running = False
176         self.enable_reuse = enable_reuse
177         self.uuid = None
178         self.final_output = None
179         self.output_name = output_name
180         self.output_tags = output_tags
181         self.name = name
182         self.on_error = on_error
183         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
184
185         if submit_runner_ram:
186             self.submit_runner_ram = submit_runner_ram
187         else:
188             self.submit_runner_ram = 1024
189
190         if self.submit_runner_ram <= 0:
191             raise Exception("Value of --submit-runner-ram must be greater than zero")
192
193     def update_pipeline_component(self, record):
194         pass
195
196     def arvados_job_spec(self, *args, **kwargs):
197         if self.name is None:
198             self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
199
200         # Need to filter this out, gets added by cwltool when providing
201         # parameters on the command line.
202         if "job_order" in self.job_order:
203             del self.job_order["job_order"]
204
205         workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
206         adjustDirObjs(self.job_order, trim_listing)
207         return workflowmapper
208
209     def done(self, record):
210         try:
211             if record["state"] == "Complete":
212                 if record.get("exit_code") is not None:
213                     if record["exit_code"] == 33:
214                         processStatus = "UnsupportedRequirement"
215                     elif record["exit_code"] == 0:
216                         processStatus = "success"
217                     else:
218                         processStatus = "permanentFail"
219                 else:
220                     processStatus = "success"
221             else:
222                 processStatus = "permanentFail"
223
224             outputs = {}
225
226             if processStatus == "permanentFail":
227                 logc = arvados.collection.CollectionReader(record["log"],
228                                                            api_client=self.arvrunner.api,
229                                                            keep_client=self.arvrunner.keep_client,
230                                                            num_retries=self.arvrunner.num_retries)
231                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
232
233             self.final_output = record["output"]
234             outc = arvados.collection.CollectionReader(self.final_output,
235                                                        api_client=self.arvrunner.api,
236                                                        keep_client=self.arvrunner.keep_client,
237                                                        num_retries=self.arvrunner.num_retries)
238             if "cwl.output.json" in outc:
239                 with outc.open("cwl.output.json") as f:
240                     if f.size() > 0:
241                         outputs = json.load(f)
242             def keepify(fileobj):
243                 path = fileobj["location"]
244                 if not path.startswith("keep:"):
245                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
246             adjustFileObjs(outputs, keepify)
247             adjustDirObjs(outputs, keepify)
248         except Exception as e:
249             logger.exception("[%s] While getting final output object: %s", self.name, e)
250             self.arvrunner.output_callback({}, "permanentFail")
251         else:
252             self.arvrunner.output_callback(outputs, processStatus)
253         finally:
254             if record["uuid"] in self.arvrunner.processes:
255                 del self.arvrunner.processes[record["uuid"]]