Merge branch '11960-trash-events'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess
11
12 from StringIO import StringIO
13
14 from schema_salad.sourceline import SourceLine
15
16 import cwltool.draft2tool
17 from cwltool.draft2tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49 def find_defaults(d, op):
50     if isinstance(d, list):
51         for i in d:
52             find_defaults(i, op)
53     elif isinstance(d, dict):
54         if "default" in d:
55             op(d)
56         else:
57             for i in d.itervalues():
58                 find_defaults(i, op)
59
60 def upload_dependencies(arvrunner, name, document_loader,
61                         workflowobj, uri, loadref_run, include_primary=True):
62     """Upload the dependencies of the workflowobj document to Keep.
63
64     Returns a pathmapper object mapping local paths to keep references.  Also
65     does an in-place update of references in "workflowobj".
66
67     Use scandeps to find $import, $include, $schemas, run, File and Directory
68     fields that represent external references.
69
70     If workflowobj has an "id" field, this will reload the document to ensure
71     it is scanning the raw document prior to preprocessing.
72     """
73
74     loaded = set()
75     def loadref(b, u):
76         joined = document_loader.fetcher.urljoin(b, u)
77         defrg, _ = urlparse.urldefrag(joined)
78         if defrg not in loaded:
79             loaded.add(defrg)
80             # Use fetch_text to get raw file (before preprocessing).
81             text = document_loader.fetch_text(defrg)
82             if isinstance(text, bytes):
83                 textIO = StringIO(text.decode('utf-8'))
84             else:
85                 textIO = StringIO(text)
86             return yaml.safe_load(textIO)
87         else:
88             return {}
89
90     if loadref_run:
91         loadref_fields = set(("$import", "run"))
92     else:
93         loadref_fields = set(("$import",))
94
95     scanobj = workflowobj
96     if "id" in workflowobj:
97         # Need raw file content (before preprocessing) to ensure
98         # that external references in $include and $mixin are captured.
99         scanobj = loadref("", workflowobj["id"])
100
101     sc = scandeps(uri, scanobj,
102                   loadref_fields,
103                   set(("$include", "$schemas", "location")),
104                   loadref, urljoin=document_loader.fetcher.urljoin)
105
106     normalizeFilesDirs(sc)
107
108     if include_primary and "id" in workflowobj:
109         sc.append({"class": "File", "location": workflowobj["id"]})
110
111     if "$schemas" in workflowobj:
112         for s in workflowobj["$schemas"]:
113             sc.append({"class": "File", "location": s})
114
115     def capture_default(obj):
116         remove = [False]
117         def add_default(f):
118             if "location" not in f and "path" in f:
119                 f["location"] = f["path"]
120                 del f["path"]
121             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
122                 # Remove from sc
123                 sc[:] = [x for x in sc if x["location"] != f["location"]]
124                 # Delete "default" from workflowobj
125                 remove[0] = True
126         visit_class(obj["default"], ("File", "Directory"), add_default)
127         if remove[0]:
128             del obj["default"]
129
130     find_defaults(workflowobj, capture_default)
131
132     mapper = ArvPathMapper(arvrunner, sc, "",
133                            "keep:%s",
134                            "keep:%s/%s",
135                            name=name,
136                            single_collection=True)
137
138     def setloc(p):
139         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
140             p["location"] = mapper.mapper(p["location"]).resolved
141     adjustFileObjs(workflowobj, setloc)
142     adjustDirObjs(workflowobj, setloc)
143
144     if "$schemas" in workflowobj:
145         sch = []
146         for s in workflowobj["$schemas"]:
147             sch.append(mapper.mapper(s).resolved)
148         workflowobj["$schemas"] = sch
149
150     return mapper
151
152
153 def upload_docker(arvrunner, tool):
154     """Uploads Docker images used in CommandLineTool objects."""
155
156     if isinstance(tool, CommandLineTool):
157         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
158         if docker_req:
159             if docker_req.get("dockerOutputDirectory"):
160                 # TODO: can be supported by containers API, but not jobs API.
161                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
162                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
163             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
164     elif isinstance(tool, cwltool.workflow.Workflow):
165         for s in tool.steps:
166             upload_docker(arvrunner, s.embedded_tool)
167
168 def packed_workflow(arvrunner, tool):
169     """Create a packed workflow.
170
171     A "packed" workflow is one where all the components have been combined into a single document."""
172
173     return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
174                 tool.tool["id"], tool.metadata)
175
176 def tag_git_version(packed):
177     if tool.tool["id"].startswith("file://"):
178         path = os.path.dirname(tool.tool["id"][7:])
179         try:
180             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
181         except (OSError, subprocess.CalledProcessError):
182             pass
183         else:
184             packed["http://schema.org/version"] = githash
185
186
187 def upload_job_order(arvrunner, name, tool, job_order):
188     """Upload local files referenced in the input object and return updated input
189     object with 'location' updated to the proper keep references.
190     """
191
192     for t in tool.tool["inputs"]:
193         def setSecondary(fileobj):
194             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
195                 if "secondaryFiles" not in fileobj:
196                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
197
198             if isinstance(fileobj, list):
199                 for e in fileobj:
200                     setSecondary(e)
201
202         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
203             setSecondary(job_order[shortname(t["id"])])
204
205     jobmapper = upload_dependencies(arvrunner,
206                                     name,
207                                     tool.doc_loader,
208                                     job_order,
209                                     job_order.get("id", "#"),
210                                     False)
211
212     if "id" in job_order:
213         del job_order["id"]
214
215     # Need to filter this out, gets added by cwltool when providing
216     # parameters on the command line.
217     if "job_order" in job_order:
218         del job_order["job_order"]
219
220     return job_order
221
222 def upload_workflow_deps(arvrunner, tool, override_tools):
223     # Ensure that Docker images needed by this workflow are available
224
225     upload_docker(arvrunner, tool)
226
227     document_loader = tool.doc_loader
228
229     def upload_tool_deps(deptool):
230         if "id" in deptool:
231             upload_dependencies(arvrunner,
232                                 "%s dependencies" % (shortname(deptool["id"])),
233                                 document_loader,
234                                 deptool,
235                                 deptool["id"],
236                                 False,
237                                 include_primary=False)
238             document_loader.idx[deptool["id"]] = deptool
239             override_tools[deptool["id"]] = json.dumps(deptool)
240
241     tool.visit(upload_tool_deps)
242
243 def arvados_jobs_image(arvrunner, img):
244     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
245
246     try:
247         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
248     except Exception as e:
249         raise Exception("Docker image %s is not available\n%s" % (img, e) )
250     return img
251
252 def upload_workflow_collection(arvrunner, name, packed):
253     collection = arvados.collection.Collection(api_client=arvrunner.api,
254                                                keep_client=arvrunner.keep_client,
255                                                num_retries=arvrunner.num_retries)
256     with collection.open("workflow.cwl", "w") as f:
257         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
258
259     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
260                ["name", "like", name+"%"]]
261     if arvrunner.project_uuid:
262         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
263     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
264
265     if exists["items"]:
266         logger.info("Using collection %s", exists["items"][0]["uuid"])
267     else:
268         collection.save_new(name=name,
269                             owner_uuid=arvrunner.project_uuid,
270                             ensure_unique_name=True,
271                             num_retries=arvrunner.num_retries)
272         logger.info("Uploaded to %s", collection.manifest_locator())
273
274     return collection.portable_data_hash()
275
276
277 class Runner(object):
278     """Base class for runner processes, which submit an instance of
279     arvados-cwl-runner and wait for the final result."""
280
281     def __init__(self, runner, tool, job_order, enable_reuse,
282                  output_name, output_tags, submit_runner_ram=0,
283                  name=None, on_error=None, submit_runner_image=None,
284                  intermediate_output_ttl=0):
285         self.arvrunner = runner
286         self.tool = tool
287         self.job_order = job_order
288         self.running = False
289         self.enable_reuse = enable_reuse
290         self.uuid = None
291         self.final_output = None
292         self.output_name = output_name
293         self.output_tags = output_tags
294         self.name = name
295         self.on_error = on_error
296         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
297         self.intermediate_output_ttl = intermediate_output_ttl
298
299         if submit_runner_ram:
300             self.submit_runner_ram = submit_runner_ram
301         else:
302             self.submit_runner_ram = 3000
303
304         if self.submit_runner_ram <= 0:
305             raise Exception("Value of --submit-runner-ram must be greater than zero")
306
307     def update_pipeline_component(self, record):
308         pass
309
310     def done(self, record):
311         """Base method for handling a completed runner."""
312
313         try:
314             if record["state"] == "Complete":
315                 if record.get("exit_code") is not None:
316                     if record["exit_code"] == 33:
317                         processStatus = "UnsupportedRequirement"
318                     elif record["exit_code"] == 0:
319                         processStatus = "success"
320                     else:
321                         processStatus = "permanentFail"
322                 else:
323                     processStatus = "success"
324             else:
325                 processStatus = "permanentFail"
326
327             outputs = {}
328
329             if processStatus == "permanentFail":
330                 logc = arvados.collection.CollectionReader(record["log"],
331                                                            api_client=self.arvrunner.api,
332                                                            keep_client=self.arvrunner.keep_client,
333                                                            num_retries=self.arvrunner.num_retries)
334                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
335
336             self.final_output = record["output"]
337             outc = arvados.collection.CollectionReader(self.final_output,
338                                                        api_client=self.arvrunner.api,
339                                                        keep_client=self.arvrunner.keep_client,
340                                                        num_retries=self.arvrunner.num_retries)
341             if "cwl.output.json" in outc:
342                 with outc.open("cwl.output.json") as f:
343                     if f.size() > 0:
344                         outputs = json.load(f)
345             def keepify(fileobj):
346                 path = fileobj["location"]
347                 if not path.startswith("keep:"):
348                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
349             adjustFileObjs(outputs, keepify)
350             adjustDirObjs(outputs, keepify)
351         except Exception as e:
352             logger.exception("[%s] While getting final output object: %s", self.name, e)
353             self.arvrunner.output_callback({}, "permanentFail")
354         else:
355             self.arvrunner.output_callback(outputs, processStatus)
356         finally:
357             if record["uuid"] in self.arvrunner.processes:
358                 del self.arvrunner.processes[record["uuid"]]