Merge branch '4019-query-properties' closes #4019
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess
11
12 from StringIO import StringIO
13
14 from schema_salad.sourceline import SourceLine
15
16 import cwltool.draft2tool
17 from cwltool.draft2tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49 def remove_redundant_fields(obj):
50     for field in ("path", "nameext", "nameroot", "dirname"):
51         if field in obj:
52             del obj[field]
53
54 def find_defaults(d, op):
55     if isinstance(d, list):
56         for i in d:
57             find_defaults(i, op)
58     elif isinstance(d, dict):
59         if "default" in d:
60             op(d)
61         else:
62             for i in d.itervalues():
63                 find_defaults(i, op)
64
65 def upload_dependencies(arvrunner, name, document_loader,
66                         workflowobj, uri, loadref_run, include_primary=True):
67     """Upload the dependencies of the workflowobj document to Keep.
68
69     Returns a pathmapper object mapping local paths to keep references.  Also
70     does an in-place update of references in "workflowobj".
71
72     Use scandeps to find $import, $include, $schemas, run, File and Directory
73     fields that represent external references.
74
75     If workflowobj has an "id" field, this will reload the document to ensure
76     it is scanning the raw document prior to preprocessing.
77     """
78
79     loaded = set()
80     def loadref(b, u):
81         joined = document_loader.fetcher.urljoin(b, u)
82         defrg, _ = urlparse.urldefrag(joined)
83         if defrg not in loaded:
84             loaded.add(defrg)
85             # Use fetch_text to get raw file (before preprocessing).
86             text = document_loader.fetch_text(defrg)
87             if isinstance(text, bytes):
88                 textIO = StringIO(text.decode('utf-8'))
89             else:
90                 textIO = StringIO(text)
91             return yaml.safe_load(textIO)
92         else:
93             return {}
94
95     if loadref_run:
96         loadref_fields = set(("$import", "run"))
97     else:
98         loadref_fields = set(("$import",))
99
100     scanobj = workflowobj
101     if "id" in workflowobj:
102         # Need raw file content (before preprocessing) to ensure
103         # that external references in $include and $mixin are captured.
104         scanobj = loadref("", workflowobj["id"])
105
106     sc = scandeps(uri, scanobj,
107                   loadref_fields,
108                   set(("$include", "$schemas", "location")),
109                   loadref, urljoin=document_loader.fetcher.urljoin)
110
111     normalizeFilesDirs(sc)
112
113     if include_primary and "id" in workflowobj:
114         sc.append({"class": "File", "location": workflowobj["id"]})
115
116     if "$schemas" in workflowobj:
117         for s in workflowobj["$schemas"]:
118             sc.append({"class": "File", "location": s})
119
120     def capture_default(obj):
121         remove = [False]
122         def add_default(f):
123             if "location" not in f and "path" in f:
124                 f["location"] = f["path"]
125                 del f["path"]
126             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
127                 # Remove from sc
128                 sc[:] = [x for x in sc if x["location"] != f["location"]]
129                 # Delete "default" from workflowobj
130                 remove[0] = True
131         visit_class(obj["default"], ("File", "Directory"), add_default)
132         if remove[0]:
133             del obj["default"]
134
135     find_defaults(workflowobj, capture_default)
136
137     mapper = ArvPathMapper(arvrunner, sc, "",
138                            "keep:%s",
139                            "keep:%s/%s",
140                            name=name,
141                            single_collection=True)
142
143     def setloc(p):
144         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
145             p["location"] = mapper.mapper(p["location"]).resolved
146     adjustFileObjs(workflowobj, setloc)
147     adjustDirObjs(workflowobj, setloc)
148
149     if "$schemas" in workflowobj:
150         sch = []
151         for s in workflowobj["$schemas"]:
152             sch.append(mapper.mapper(s).resolved)
153         workflowobj["$schemas"] = sch
154
155     return mapper
156
157
158 def upload_docker(arvrunner, tool):
159     """Uploads Docker images used in CommandLineTool objects."""
160
161     if isinstance(tool, CommandLineTool):
162         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
163         if docker_req:
164             if docker_req.get("dockerOutputDirectory"):
165                 # TODO: can be supported by containers API, but not jobs API.
166                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
167                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
168             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
169         else:
170             arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
171     elif isinstance(tool, cwltool.workflow.Workflow):
172         for s in tool.steps:
173             upload_docker(arvrunner, s.embedded_tool)
174
175 def packed_workflow(arvrunner, tool):
176     """Create a packed workflow.
177
178     A "packed" workflow is one where all the components have been combined into a single document."""
179
180     return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
181                 tool.tool["id"], tool.metadata)
182
183 def tag_git_version(packed):
184     if tool.tool["id"].startswith("file://"):
185         path = os.path.dirname(tool.tool["id"][7:])
186         try:
187             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
188         except (OSError, subprocess.CalledProcessError):
189             pass
190         else:
191             packed["http://schema.org/version"] = githash
192
193
194 def upload_job_order(arvrunner, name, tool, job_order):
195     """Upload local files referenced in the input object and return updated input
196     object with 'location' updated to the proper keep references.
197     """
198
199     for t in tool.tool["inputs"]:
200         def setSecondary(fileobj):
201             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
202                 if "secondaryFiles" not in fileobj:
203                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
204
205             if isinstance(fileobj, list):
206                 for e in fileobj:
207                     setSecondary(e)
208
209         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
210             setSecondary(job_order[shortname(t["id"])])
211
212     jobmapper = upload_dependencies(arvrunner,
213                                     name,
214                                     tool.doc_loader,
215                                     job_order,
216                                     job_order.get("id", "#"),
217                                     False)
218
219     if "id" in job_order:
220         del job_order["id"]
221
222     # Need to filter this out, gets added by cwltool when providing
223     # parameters on the command line.
224     if "job_order" in job_order:
225         del job_order["job_order"]
226
227     return job_order
228
229 def upload_workflow_deps(arvrunner, tool, override_tools):
230     # Ensure that Docker images needed by this workflow are available
231
232     upload_docker(arvrunner, tool)
233
234     document_loader = tool.doc_loader
235
236     def upload_tool_deps(deptool):
237         if "id" in deptool:
238             upload_dependencies(arvrunner,
239                                 "%s dependencies" % (shortname(deptool["id"])),
240                                 document_loader,
241                                 deptool,
242                                 deptool["id"],
243                                 False,
244                                 include_primary=False)
245             document_loader.idx[deptool["id"]] = deptool
246             override_tools[deptool["id"]] = json.dumps(deptool)
247
248     tool.visit(upload_tool_deps)
249
250 def arvados_jobs_image(arvrunner, img):
251     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
252
253     try:
254         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
255     except Exception as e:
256         raise Exception("Docker image %s is not available\n%s" % (img, e) )
257     return img
258
259 def upload_workflow_collection(arvrunner, name, packed):
260     collection = arvados.collection.Collection(api_client=arvrunner.api,
261                                                keep_client=arvrunner.keep_client,
262                                                num_retries=arvrunner.num_retries)
263     with collection.open("workflow.cwl", "w") as f:
264         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
265
266     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
267                ["name", "like", name+"%"]]
268     if arvrunner.project_uuid:
269         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
270     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
271
272     if exists["items"]:
273         logger.info("Using collection %s", exists["items"][0]["uuid"])
274     else:
275         collection.save_new(name=name,
276                             owner_uuid=arvrunner.project_uuid,
277                             ensure_unique_name=True,
278                             num_retries=arvrunner.num_retries)
279         logger.info("Uploaded to %s", collection.manifest_locator())
280
281     return collection.portable_data_hash()
282
283
284 class Runner(object):
285     """Base class for runner processes, which submit an instance of
286     arvados-cwl-runner and wait for the final result."""
287
288     def __init__(self, runner, tool, job_order, enable_reuse,
289                  output_name, output_tags, submit_runner_ram=0,
290                  name=None, on_error=None, submit_runner_image=None,
291                  intermediate_output_ttl=0):
292         self.arvrunner = runner
293         self.tool = tool
294         self.job_order = job_order
295         self.running = False
296         if enable_reuse:
297             # If reuse is permitted by command line arguments but
298             # disabled by the workflow itself, disable it.
299             reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
300             if reuse_req:
301                 enable_reuse = reuse_req["enableReuse"]
302         self.enable_reuse = enable_reuse
303         self.uuid = None
304         self.final_output = None
305         self.output_name = output_name
306         self.output_tags = output_tags
307         self.name = name
308         self.on_error = on_error
309         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
310         self.intermediate_output_ttl = intermediate_output_ttl
311
312         if submit_runner_ram:
313             self.submit_runner_ram = submit_runner_ram
314         else:
315             self.submit_runner_ram = 3000
316
317         if self.submit_runner_ram <= 0:
318             raise Exception("Value of --submit-runner-ram must be greater than zero")
319
320     def update_pipeline_component(self, record):
321         pass
322
323     def done(self, record):
324         """Base method for handling a completed runner."""
325
326         try:
327             if record["state"] == "Complete":
328                 if record.get("exit_code") is not None:
329                     if record["exit_code"] == 33:
330                         processStatus = "UnsupportedRequirement"
331                     elif record["exit_code"] == 0:
332                         processStatus = "success"
333                     else:
334                         processStatus = "permanentFail"
335                 else:
336                     processStatus = "success"
337             else:
338                 processStatus = "permanentFail"
339
340             outputs = {}
341
342             if processStatus == "permanentFail":
343                 logc = arvados.collection.CollectionReader(record["log"],
344                                                            api_client=self.arvrunner.api,
345                                                            keep_client=self.arvrunner.keep_client,
346                                                            num_retries=self.arvrunner.num_retries)
347                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
348
349             self.final_output = record["output"]
350             outc = arvados.collection.CollectionReader(self.final_output,
351                                                        api_client=self.arvrunner.api,
352                                                        keep_client=self.arvrunner.keep_client,
353                                                        num_retries=self.arvrunner.num_retries)
354             if "cwl.output.json" in outc:
355                 with outc.open("cwl.output.json") as f:
356                     if f.size() > 0:
357                         outputs = json.load(f)
358             def keepify(fileobj):
359                 path = fileobj["location"]
360                 if not path.startswith("keep:"):
361                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
362             adjustFileObjs(outputs, keepify)
363             adjustDirObjs(outputs, keepify)
364         except Exception as e:
365             logger.exception("[%s] While getting final output object: %s", self.name, e)
366             self.arvrunner.output_callback({}, "permanentFail")
367         else:
368             self.arvrunner.output_callback(outputs, processStatus)
369         finally:
370             if record["uuid"] in self.arvrunner.processes:
371                 del self.arvrunner.processes[record["uuid"]]