11100: Propagate through to runner. Use intermediate_output_ttl consistently.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import subprocess
7
8 from StringIO import StringIO
9
10 from schema_salad.sourceline import SourceLine
11
12 import cwltool.draft2tool
13 from cwltool.draft2tool import CommandLineTool
14 import cwltool.workflow
15 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
16 from cwltool.load_tool import fetch_document
17 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
18 from cwltool.utils import aslist
19 from cwltool.builder import substitute
20 from cwltool.pack import pack
21
22 import arvados.collection
23 import ruamel.yaml as yaml
24
25 from .arvdocker import arv_docker_get_image
26 from .pathmapper import ArvPathMapper, trim_listing
27 from ._version import __version__
28 from . import done
29
30 logger = logging.getLogger('arvados.cwl-runner')
31
32 def trim_anonymous_location(obj):
33     """Remove 'location' field from File and Directory literals.
34
35     To make internal handling easier, literals are assigned a random id for
36     'location'.  However, when writing the record back out, this can break
37     reproducibility.  Since it is valid for literals not have a 'location'
38     field, remove it.
39
40     """
41
42     if obj.get("location", "").startswith("_:"):
43         del obj["location"]
44
45 def upload_dependencies(arvrunner, name, document_loader,
46                         workflowobj, uri, loadref_run, include_primary=True):
47     """Upload the dependencies of the workflowobj document to Keep.
48
49     Returns a pathmapper object mapping local paths to keep references.  Also
50     does an in-place update of references in "workflowobj".
51
52     Use scandeps to find $import, $include, $schemas, run, File and Directory
53     fields that represent external references.
54
55     If workflowobj has an "id" field, this will reload the document to ensure
56     it is scanning the raw document prior to preprocessing.
57     """
58
59     loaded = set()
60     def loadref(b, u):
61         joined = document_loader.fetcher.urljoin(b, u)
62         defrg, _ = urlparse.urldefrag(joined)
63         if defrg not in loaded:
64             loaded.add(defrg)
65             # Use fetch_text to get raw file (before preprocessing).
66             text = document_loader.fetch_text(defrg)
67             if isinstance(text, bytes):
68                 textIO = StringIO(text.decode('utf-8'))
69             else:
70                 textIO = StringIO(text)
71             return yaml.safe_load(textIO)
72         else:
73             return {}
74
75     if loadref_run:
76         loadref_fields = set(("$import", "run"))
77     else:
78         loadref_fields = set(("$import",))
79
80     scanobj = workflowobj
81     if "id" in workflowobj:
82         # Need raw file content (before preprocessing) to ensure
83         # that external references in $include and $mixin are captured.
84         scanobj = loadref("", workflowobj["id"])
85
86     sc = scandeps(uri, scanobj,
87                   loadref_fields,
88                   set(("$include", "$schemas", "location")),
89                   loadref, urljoin=document_loader.fetcher.urljoin)
90
91     normalizeFilesDirs(sc)
92
93     if include_primary and "id" in workflowobj:
94         sc.append({"class": "File", "location": workflowobj["id"]})
95
96     if "$schemas" in workflowobj:
97         for s in workflowobj["$schemas"]:
98             sc.append({"class": "File", "location": s})
99
100     mapper = ArvPathMapper(arvrunner, sc, "",
101                            "keep:%s",
102                            "keep:%s/%s",
103                            name=name)
104
105     def setloc(p):
106         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
107             p["location"] = mapper.mapper(p["location"]).resolved
108     adjustFileObjs(workflowobj, setloc)
109     adjustDirObjs(workflowobj, setloc)
110
111     if "$schemas" in workflowobj:
112         sch = []
113         for s in workflowobj["$schemas"]:
114             sch.append(mapper.mapper(s).resolved)
115         workflowobj["$schemas"] = sch
116
117     return mapper
118
119
120 def upload_docker(arvrunner, tool):
121     """Uploads Docker images used in CommandLineTool objects."""
122
123     if isinstance(tool, CommandLineTool):
124         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
125         if docker_req:
126             if docker_req.get("dockerOutputDirectory"):
127                 # TODO: can be supported by containers API, but not jobs API.
128                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
129                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
130             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
131     elif isinstance(tool, cwltool.workflow.Workflow):
132         for s in tool.steps:
133             upload_docker(arvrunner, s.embedded_tool)
134
135 def packed_workflow(arvrunner, tool):
136     """Create a packed workflow.
137
138     A "packed" workflow is one where all the components have been combined into a single document."""
139
140     return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
141                 tool.tool["id"], tool.metadata)
142
143 def tag_git_version(packed):
144     if tool.tool["id"].startswith("file://"):
145         path = os.path.dirname(tool.tool["id"][7:])
146         try:
147             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
148         except (OSError, subprocess.CalledProcessError):
149             pass
150         else:
151             packed["http://schema.org/version"] = githash
152
153
154 def upload_job_order(arvrunner, name, tool, job_order):
155     """Upload local files referenced in the input object and return updated input
156     object with 'location' updated to the proper keep references.
157     """
158
159     for t in tool.tool["inputs"]:
160         def setSecondary(fileobj):
161             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
162                 if "secondaryFiles" not in fileobj:
163                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
164
165             if isinstance(fileobj, list):
166                 for e in fileobj:
167                     setSecondary(e)
168
169         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
170             setSecondary(job_order[shortname(t["id"])])
171
172     jobmapper = upload_dependencies(arvrunner,
173                                     name,
174                                     tool.doc_loader,
175                                     job_order,
176                                     job_order.get("id", "#"),
177                                     False)
178
179     if "id" in job_order:
180         del job_order["id"]
181
182     # Need to filter this out, gets added by cwltool when providing
183     # parameters on the command line.
184     if "job_order" in job_order:
185         del job_order["job_order"]
186
187     return job_order
188
189 def upload_workflow_deps(arvrunner, tool):
190     # Ensure that Docker images needed by this workflow are available
191
192     upload_docker(arvrunner, tool)
193
194     document_loader = tool.doc_loader
195
196     def upload_tool_deps(deptool):
197         if "id" in deptool:
198             upload_dependencies(arvrunner,
199                                 "%s dependencies" % (shortname(deptool["id"])),
200                                 document_loader,
201                                 deptool,
202                                 deptool["id"],
203                                 False,
204                                 include_primary=False)
205             document_loader.idx[deptool["id"]] = deptool
206
207     tool.visit(upload_tool_deps)
208
209 def arvados_jobs_image(arvrunner, img):
210     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
211
212     try:
213         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
214     except Exception as e:
215         raise Exception("Docker image %s is not available\n%s" % (img, e) )
216     return img
217
218 def upload_workflow_collection(arvrunner, name, packed):
219     collection = arvados.collection.Collection(api_client=arvrunner.api,
220                                                keep_client=arvrunner.keep_client,
221                                                num_retries=arvrunner.num_retries)
222     with collection.open("workflow.cwl", "w") as f:
223         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
224
225     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
226                ["name", "like", name+"%"]]
227     if arvrunner.project_uuid:
228         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
229     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
230
231     if exists["items"]:
232         logger.info("Using collection %s", exists["items"][0]["uuid"])
233     else:
234         collection.save_new(name=name,
235                             owner_uuid=arvrunner.project_uuid,
236                             ensure_unique_name=True,
237                             num_retries=arvrunner.num_retries)
238         logger.info("Uploaded to %s", collection.manifest_locator())
239
240     return collection.portable_data_hash()
241
242
243 class Runner(object):
244     """Base class for runner processes, which submit an instance of
245     arvados-cwl-runner and wait for the final result."""
246
247     def __init__(self, runner, tool, job_order, enable_reuse,
248                  output_name, output_tags, submit_runner_ram=0,
249                  name=None, on_error=None, submit_runner_image=None,
250                  intermediate_output_ttl=0):
251         self.arvrunner = runner
252         self.tool = tool
253         self.job_order = job_order
254         self.running = False
255         self.enable_reuse = enable_reuse
256         self.uuid = None
257         self.final_output = None
258         self.output_name = output_name
259         self.output_tags = output_tags
260         self.name = name
261         self.on_error = on_error
262         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
263         self.intermediate_output_ttl = intermediate_output_ttl
264
265         if submit_runner_ram:
266             self.submit_runner_ram = submit_runner_ram
267         else:
268             self.submit_runner_ram = 3000
269
270         if self.submit_runner_ram <= 0:
271             raise Exception("Value of --submit-runner-ram must be greater than zero")
272
273     def update_pipeline_component(self, record):
274         pass
275
276     def done(self, record):
277         """Base method for handling a completed runner."""
278
279         try:
280             if record["state"] == "Complete":
281                 if record.get("exit_code") is not None:
282                     if record["exit_code"] == 33:
283                         processStatus = "UnsupportedRequirement"
284                     elif record["exit_code"] == 0:
285                         processStatus = "success"
286                     else:
287                         processStatus = "permanentFail"
288                 else:
289                     processStatus = "success"
290             else:
291                 processStatus = "permanentFail"
292
293             outputs = {}
294
295             if processStatus == "permanentFail":
296                 logc = arvados.collection.CollectionReader(record["log"],
297                                                            api_client=self.arvrunner.api,
298                                                            keep_client=self.arvrunner.keep_client,
299                                                            num_retries=self.arvrunner.num_retries)
300                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
301
302             self.final_output = record["output"]
303             outc = arvados.collection.CollectionReader(self.final_output,
304                                                        api_client=self.arvrunner.api,
305                                                        keep_client=self.arvrunner.keep_client,
306                                                        num_retries=self.arvrunner.num_retries)
307             if "cwl.output.json" in outc:
308                 with outc.open("cwl.output.json") as f:
309                     if f.size() > 0:
310                         outputs = json.load(f)
311             def keepify(fileobj):
312                 path = fileobj["location"]
313                 if not path.startswith("keep:"):
314                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
315             adjustFileObjs(outputs, keepify)
316             adjustDirObjs(outputs, keepify)
317         except Exception as e:
318             logger.exception("[%s] While getting final output object: %s", self.name, e)
319             self.arvrunner.output_callback({}, "permanentFail")
320         else:
321             self.arvrunner.output_callback(outputs, processStatus)
322         finally:
323             if record["uuid"] in self.arvrunner.processes:
324                 del self.arvrunner.processes[record["uuid"]]