11948: Strip out default values which reference unavailable files.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess
11
12 from StringIO import StringIO
13
14 from schema_salad.sourceline import SourceLine
15
16 import cwltool.draft2tool
17 from cwltool.draft2tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49 def find_defaults(d, op):
50     if isinstance(d, list):
51         for i in d:
52             find_defaults(i, op)
53     if isinstance(d, dict):
54         if "default" in d:
55             op(d)
56         else:
57             for i in d.itervalues():
58                 find_defaults(i, op)
59
60 def upload_dependencies(arvrunner, name, document_loader,
61                         workflowobj, uri, loadref_run, include_primary=True):
62     """Upload the dependencies of the workflowobj document to Keep.
63
64     Returns a pathmapper object mapping local paths to keep references.  Also
65     does an in-place update of references in "workflowobj".
66
67     Use scandeps to find $import, $include, $schemas, run, File and Directory
68     fields that represent external references.
69
70     If workflowobj has an "id" field, this will reload the document to ensure
71     it is scanning the raw document prior to preprocessing.
72     """
73
74     loaded = set()
75     def loadref(b, u):
76         joined = document_loader.fetcher.urljoin(b, u)
77         defrg, _ = urlparse.urldefrag(joined)
78         if defrg not in loaded:
79             loaded.add(defrg)
80             # Use fetch_text to get raw file (before preprocessing).
81             text = document_loader.fetch_text(defrg)
82             if isinstance(text, bytes):
83                 textIO = StringIO(text.decode('utf-8'))
84             else:
85                 textIO = StringIO(text)
86             return yaml.safe_load(textIO)
87         else:
88             return {}
89
90     if loadref_run:
91         loadref_fields = set(("$import", "run"))
92     else:
93         loadref_fields = set(("$import",))
94
95     scanobj = workflowobj
96     if "id" in workflowobj:
97         # Need raw file content (before preprocessing) to ensure
98         # that external references in $include and $mixin are captured.
99         scanobj = loadref("", workflowobj["id"])
100
101     sc = scandeps(uri, scanobj,
102                   loadref_fields,
103                   set(("$include", "$schemas", "location")),
104                   loadref, urljoin=document_loader.fetcher.urljoin)
105
106     normalizeFilesDirs(sc)
107
108     if include_primary and "id" in workflowobj:
109         sc.append({"class": "File", "location": workflowobj["id"]})
110
111     if "$schemas" in workflowobj:
112         for s in workflowobj["$schemas"]:
113             sc.append({"class": "File", "location": s})
114
115     def capture_default(obj):
116         remove = [False]
117         def add_default(f):
118             if "location" not in f and "path" in f:
119                 f["location"] = f["path"]
120                 del f["path"]
121             if not arvrunner.fs_access.exists(f["location"]):
122                 # Remove from sc
123                 i = 0
124                 while i < len(sc):
125                     if sc[i]["location"] == f["location"]:
126                         del sc[i]
127                     else:
128                         i += 1
129                 # Delete "default" from workflowobj
130                 remove[0] = True
131         visit_class(obj["default"], ("File", "Directory"), add_default)
132         if remove[0]:
133             del obj["default"]
134
135     find_defaults(workflowobj, capture_default)
136
137     mapper = ArvPathMapper(arvrunner, sc, "",
138                            "keep:%s",
139                            "keep:%s/%s",
140                            name=name,
141                            single_collection=True)
142
143     def setloc(p):
144         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
145             p["location"] = mapper.mapper(p["location"]).resolved
146     adjustFileObjs(workflowobj, setloc)
147     adjustDirObjs(workflowobj, setloc)
148
149     if "$schemas" in workflowobj:
150         sch = []
151         for s in workflowobj["$schemas"]:
152             sch.append(mapper.mapper(s).resolved)
153         workflowobj["$schemas"] = sch
154
155     return mapper
156
157
158 def upload_docker(arvrunner, tool):
159     """Uploads Docker images used in CommandLineTool objects."""
160
161     if isinstance(tool, CommandLineTool):
162         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
163         if docker_req:
164             if docker_req.get("dockerOutputDirectory"):
165                 # TODO: can be supported by containers API, but not jobs API.
166                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
167                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
168             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
169     elif isinstance(tool, cwltool.workflow.Workflow):
170         for s in tool.steps:
171             upload_docker(arvrunner, s.embedded_tool)
172
173 def packed_workflow(arvrunner, tool):
174     """Create a packed workflow.
175
176     A "packed" workflow is one where all the components have been combined into a single document."""
177
178     return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
179                 tool.tool["id"], tool.metadata)
180
181 def tag_git_version(packed):
182     if tool.tool["id"].startswith("file://"):
183         path = os.path.dirname(tool.tool["id"][7:])
184         try:
185             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
186         except (OSError, subprocess.CalledProcessError):
187             pass
188         else:
189             packed["http://schema.org/version"] = githash
190
191
192 def upload_job_order(arvrunner, name, tool, job_order):
193     """Upload local files referenced in the input object and return updated input
194     object with 'location' updated to the proper keep references.
195     """
196
197     for t in tool.tool["inputs"]:
198         def setSecondary(fileobj):
199             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
200                 if "secondaryFiles" not in fileobj:
201                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
202
203             if isinstance(fileobj, list):
204                 for e in fileobj:
205                     setSecondary(e)
206
207         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
208             setSecondary(job_order[shortname(t["id"])])
209
210     jobmapper = upload_dependencies(arvrunner,
211                                     name,
212                                     tool.doc_loader,
213                                     job_order,
214                                     job_order.get("id", "#"),
215                                     False)
216
217     if "id" in job_order:
218         del job_order["id"]
219
220     # Need to filter this out, gets added by cwltool when providing
221     # parameters on the command line.
222     if "job_order" in job_order:
223         del job_order["job_order"]
224
225     return job_order
226
227 def upload_workflow_deps(arvrunner, tool, override_tools):
228     # Ensure that Docker images needed by this workflow are available
229
230     upload_docker(arvrunner, tool)
231
232     document_loader = tool.doc_loader
233
234     def upload_tool_deps(deptool):
235         if "id" in deptool:
236             upload_dependencies(arvrunner,
237                                 "%s dependencies" % (shortname(deptool["id"])),
238                                 document_loader,
239                                 deptool,
240                                 deptool["id"],
241                                 False,
242                                 include_primary=False)
243             document_loader.idx[deptool["id"]] = deptool
244             override_tools[deptool["id"]] = json.dumps(deptool)
245
246     tool.visit(upload_tool_deps)
247
248 def arvados_jobs_image(arvrunner, img):
249     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
250
251     try:
252         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
253     except Exception as e:
254         raise Exception("Docker image %s is not available\n%s" % (img, e) )
255     return img
256
257 def upload_workflow_collection(arvrunner, name, packed):
258     collection = arvados.collection.Collection(api_client=arvrunner.api,
259                                                keep_client=arvrunner.keep_client,
260                                                num_retries=arvrunner.num_retries)
261     with collection.open("workflow.cwl", "w") as f:
262         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
263
264     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
265                ["name", "like", name+"%"]]
266     if arvrunner.project_uuid:
267         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
268     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
269
270     if exists["items"]:
271         logger.info("Using collection %s", exists["items"][0]["uuid"])
272     else:
273         collection.save_new(name=name,
274                             owner_uuid=arvrunner.project_uuid,
275                             ensure_unique_name=True,
276                             num_retries=arvrunner.num_retries)
277         logger.info("Uploaded to %s", collection.manifest_locator())
278
279     return collection.portable_data_hash()
280
281
282 class Runner(object):
283     """Base class for runner processes, which submit an instance of
284     arvados-cwl-runner and wait for the final result."""
285
286     def __init__(self, runner, tool, job_order, enable_reuse,
287                  output_name, output_tags, submit_runner_ram=0,
288                  name=None, on_error=None, submit_runner_image=None,
289                  intermediate_output_ttl=0):
290         self.arvrunner = runner
291         self.tool = tool
292         self.job_order = job_order
293         self.running = False
294         self.enable_reuse = enable_reuse
295         self.uuid = None
296         self.final_output = None
297         self.output_name = output_name
298         self.output_tags = output_tags
299         self.name = name
300         self.on_error = on_error
301         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
302         self.intermediate_output_ttl = intermediate_output_ttl
303
304         if submit_runner_ram:
305             self.submit_runner_ram = submit_runner_ram
306         else:
307             self.submit_runner_ram = 3000
308
309         if self.submit_runner_ram <= 0:
310             raise Exception("Value of --submit-runner-ram must be greater than zero")
311
312     def update_pipeline_component(self, record):
313         pass
314
315     def done(self, record):
316         """Base method for handling a completed runner."""
317
318         try:
319             if record["state"] == "Complete":
320                 if record.get("exit_code") is not None:
321                     if record["exit_code"] == 33:
322                         processStatus = "UnsupportedRequirement"
323                     elif record["exit_code"] == 0:
324                         processStatus = "success"
325                     else:
326                         processStatus = "permanentFail"
327                 else:
328                     processStatus = "success"
329             else:
330                 processStatus = "permanentFail"
331
332             outputs = {}
333
334             if processStatus == "permanentFail":
335                 logc = arvados.collection.CollectionReader(record["log"],
336                                                            api_client=self.arvrunner.api,
337                                                            keep_client=self.arvrunner.keep_client,
338                                                            num_retries=self.arvrunner.num_retries)
339                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
340
341             self.final_output = record["output"]
342             outc = arvados.collection.CollectionReader(self.final_output,
343                                                        api_client=self.arvrunner.api,
344                                                        keep_client=self.arvrunner.keep_client,
345                                                        num_retries=self.arvrunner.num_retries)
346             if "cwl.output.json" in outc:
347                 with outc.open("cwl.output.json") as f:
348                     if f.size() > 0:
349                         outputs = json.load(f)
350             def keepify(fileobj):
351                 path = fileobj["location"]
352                 if not path.startswith("keep:"):
353                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
354             adjustFileObjs(outputs, keepify)
355             adjustDirObjs(outputs, keepify)
356         except Exception as e:
357             logger.exception("[%s] While getting final output object: %s", self.name, e)
358             self.arvrunner.output_callback({}, "permanentFail")
359         else:
360             self.arvrunner.output_callback(outputs, processStatus)
361         finally:
362             if record["uuid"] in self.arvrunner.processes:
363                 del self.arvrunner.processes[record["uuid"]]