Merge branch 'master' into 11840-unique-constraint-untrash-coll
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import subprocess
7
8 from StringIO import StringIO
9
10 from schema_salad.sourceline import SourceLine
11
12 import cwltool.draft2tool
13 from cwltool.draft2tool import CommandLineTool
14 import cwltool.workflow
15 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
16 from cwltool.load_tool import fetch_document
17 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
18 from cwltool.utils import aslist
19 from cwltool.builder import substitute
20 from cwltool.pack import pack
21
22 import arvados.collection
23 import ruamel.yaml as yaml
24
25 from .arvdocker import arv_docker_get_image
26 from .pathmapper import ArvPathMapper, trim_listing
27 from ._version import __version__
28 from . import done
29
30 logger = logging.getLogger('arvados.cwl-runner')
31
32 def trim_anonymous_location(obj):
33     """Remove 'location' field from File and Directory literals.
34
35     To make internal handling easier, literals are assigned a random id for
36     'location'.  However, when writing the record back out, this can break
37     reproducibility.  Since it is valid for literals not have a 'location'
38     field, remove it.
39
40     """
41
42     if obj.get("location", "").startswith("_:"):
43         del obj["location"]
44
45 def upload_dependencies(arvrunner, name, document_loader,
46                         workflowobj, uri, loadref_run, include_primary=True):
47     """Upload the dependencies of the workflowobj document to Keep.
48
49     Returns a pathmapper object mapping local paths to keep references.  Also
50     does an in-place update of references in "workflowobj".
51
52     Use scandeps to find $import, $include, $schemas, run, File and Directory
53     fields that represent external references.
54
55     If workflowobj has an "id" field, this will reload the document to ensure
56     it is scanning the raw document prior to preprocessing.
57     """
58
59     loaded = set()
60     def loadref(b, u):
61         joined = document_loader.fetcher.urljoin(b, u)
62         defrg, _ = urlparse.urldefrag(joined)
63         if defrg not in loaded:
64             loaded.add(defrg)
65             # Use fetch_text to get raw file (before preprocessing).
66             text = document_loader.fetch_text(defrg)
67             if isinstance(text, bytes):
68                 textIO = StringIO(text.decode('utf-8'))
69             else:
70                 textIO = StringIO(text)
71             return yaml.safe_load(textIO)
72         else:
73             return {}
74
75     if loadref_run:
76         loadref_fields = set(("$import", "run"))
77     else:
78         loadref_fields = set(("$import",))
79
80     scanobj = workflowobj
81     if "id" in workflowobj:
82         # Need raw file content (before preprocessing) to ensure
83         # that external references in $include and $mixin are captured.
84         scanobj = loadref("", workflowobj["id"])
85
86     sc = scandeps(uri, scanobj,
87                   loadref_fields,
88                   set(("$include", "$schemas", "location")),
89                   loadref, urljoin=document_loader.fetcher.urljoin)
90
91     normalizeFilesDirs(sc)
92
93     if include_primary and "id" in workflowobj:
94         sc.append({"class": "File", "location": workflowobj["id"]})
95
96     if "$schemas" in workflowobj:
97         for s in workflowobj["$schemas"]:
98             sc.append({"class": "File", "location": s})
99
100     mapper = ArvPathMapper(arvrunner, sc, "",
101                            "keep:%s",
102                            "keep:%s/%s",
103                            name=name,
104                            single_collection=True)
105
106     def setloc(p):
107         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
108             p["location"] = mapper.mapper(p["location"]).resolved
109     adjustFileObjs(workflowobj, setloc)
110     adjustDirObjs(workflowobj, setloc)
111
112     if "$schemas" in workflowobj:
113         sch = []
114         for s in workflowobj["$schemas"]:
115             sch.append(mapper.mapper(s).resolved)
116         workflowobj["$schemas"] = sch
117
118     return mapper
119
120
121 def upload_docker(arvrunner, tool):
122     """Uploads Docker images used in CommandLineTool objects."""
123
124     if isinstance(tool, CommandLineTool):
125         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
126         if docker_req:
127             if docker_req.get("dockerOutputDirectory"):
128                 # TODO: can be supported by containers API, but not jobs API.
129                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
130                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
131             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
132     elif isinstance(tool, cwltool.workflow.Workflow):
133         for s in tool.steps:
134             upload_docker(arvrunner, s.embedded_tool)
135
136 def packed_workflow(arvrunner, tool):
137     """Create a packed workflow.
138
139     A "packed" workflow is one where all the components have been combined into a single document."""
140
141     return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
142                 tool.tool["id"], tool.metadata)
143
144 def tag_git_version(packed):
145     if tool.tool["id"].startswith("file://"):
146         path = os.path.dirname(tool.tool["id"][7:])
147         try:
148             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
149         except (OSError, subprocess.CalledProcessError):
150             pass
151         else:
152             packed["http://schema.org/version"] = githash
153
154
155 def upload_job_order(arvrunner, name, tool, job_order):
156     """Upload local files referenced in the input object and return updated input
157     object with 'location' updated to the proper keep references.
158     """
159
160     for t in tool.tool["inputs"]:
161         def setSecondary(fileobj):
162             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
163                 if "secondaryFiles" not in fileobj:
164                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
165
166             if isinstance(fileobj, list):
167                 for e in fileobj:
168                     setSecondary(e)
169
170         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
171             setSecondary(job_order[shortname(t["id"])])
172
173     jobmapper = upload_dependencies(arvrunner,
174                                     name,
175                                     tool.doc_loader,
176                                     job_order,
177                                     job_order.get("id", "#"),
178                                     False)
179
180     if "id" in job_order:
181         del job_order["id"]
182
183     # Need to filter this out, gets added by cwltool when providing
184     # parameters on the command line.
185     if "job_order" in job_order:
186         del job_order["job_order"]
187
188     return job_order
189
190 def upload_workflow_deps(arvrunner, tool, override_tools):
191     # Ensure that Docker images needed by this workflow are available
192
193     upload_docker(arvrunner, tool)
194
195     document_loader = tool.doc_loader
196
197     def upload_tool_deps(deptool):
198         if "id" in deptool:
199             upload_dependencies(arvrunner,
200                                 "%s dependencies" % (shortname(deptool["id"])),
201                                 document_loader,
202                                 deptool,
203                                 deptool["id"],
204                                 False,
205                                 include_primary=False)
206             document_loader.idx[deptool["id"]] = deptool
207             override_tools[deptool["id"]] = json.dumps(deptool)
208
209     tool.visit(upload_tool_deps)
210
211 def arvados_jobs_image(arvrunner, img):
212     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
213
214     try:
215         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
216     except Exception as e:
217         raise Exception("Docker image %s is not available\n%s" % (img, e) )
218     return img
219
220 def upload_workflow_collection(arvrunner, name, packed):
221     collection = arvados.collection.Collection(api_client=arvrunner.api,
222                                                keep_client=arvrunner.keep_client,
223                                                num_retries=arvrunner.num_retries)
224     with collection.open("workflow.cwl", "w") as f:
225         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
226
227     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
228                ["name", "like", name+"%"]]
229     if arvrunner.project_uuid:
230         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
231     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
232
233     if exists["items"]:
234         logger.info("Using collection %s", exists["items"][0]["uuid"])
235     else:
236         collection.save_new(name=name,
237                             owner_uuid=arvrunner.project_uuid,
238                             ensure_unique_name=True,
239                             num_retries=arvrunner.num_retries)
240         logger.info("Uploaded to %s", collection.manifest_locator())
241
242     return collection.portable_data_hash()
243
244
245 class Runner(object):
246     """Base class for runner processes, which submit an instance of
247     arvados-cwl-runner and wait for the final result."""
248
249     def __init__(self, runner, tool, job_order, enable_reuse,
250                  output_name, output_tags, submit_runner_ram=0,
251                  name=None, on_error=None, submit_runner_image=None,
252                  intermediate_output_ttl=0):
253         self.arvrunner = runner
254         self.tool = tool
255         self.job_order = job_order
256         self.running = False
257         self.enable_reuse = enable_reuse
258         self.uuid = None
259         self.final_output = None
260         self.output_name = output_name
261         self.output_tags = output_tags
262         self.name = name
263         self.on_error = on_error
264         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
265         self.intermediate_output_ttl = intermediate_output_ttl
266
267         if submit_runner_ram:
268             self.submit_runner_ram = submit_runner_ram
269         else:
270             self.submit_runner_ram = 3000
271
272         if self.submit_runner_ram <= 0:
273             raise Exception("Value of --submit-runner-ram must be greater than zero")
274
275     def update_pipeline_component(self, record):
276         pass
277
278     def done(self, record):
279         """Base method for handling a completed runner."""
280
281         try:
282             if record["state"] == "Complete":
283                 if record.get("exit_code") is not None:
284                     if record["exit_code"] == 33:
285                         processStatus = "UnsupportedRequirement"
286                     elif record["exit_code"] == 0:
287                         processStatus = "success"
288                     else:
289                         processStatus = "permanentFail"
290                 else:
291                     processStatus = "success"
292             else:
293                 processStatus = "permanentFail"
294
295             outputs = {}
296
297             if processStatus == "permanentFail":
298                 logc = arvados.collection.CollectionReader(record["log"],
299                                                            api_client=self.arvrunner.api,
300                                                            keep_client=self.arvrunner.keep_client,
301                                                            num_retries=self.arvrunner.num_retries)
302                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
303
304             self.final_output = record["output"]
305             outc = arvados.collection.CollectionReader(self.final_output,
306                                                        api_client=self.arvrunner.api,
307                                                        keep_client=self.arvrunner.keep_client,
308                                                        num_retries=self.arvrunner.num_retries)
309             if "cwl.output.json" in outc:
310                 with outc.open("cwl.output.json") as f:
311                     if f.size() > 0:
312                         outputs = json.load(f)
313             def keepify(fileobj):
314                 path = fileobj["location"]
315                 if not path.startswith("keep:"):
316                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
317             adjustFileObjs(outputs, keepify)
318             adjustDirObjs(outputs, keepify)
319         except Exception as e:
320             logger.exception("[%s] While getting final output object: %s", self.name, e)
321             self.arvrunner.output_callback({}, "permanentFail")
322         else:
323             self.arvrunner.output_callback(outputs, processStatus)
324         finally:
325             if record["uuid"] in self.arvrunner.processes:
326                 del self.arvrunner.processes[record["uuid"]]