Merge branch '12194-search-always-valid'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess
11
12 from StringIO import StringIO
13
14 from schema_salad.sourceline import SourceLine
15
16 import cwltool.draft2tool
17 from cwltool.draft2tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49 def remove_redundant_fields(obj):
50     for field in ("path", "nameext", "nameroot", "dirname"):
51         if field in obj:
52             del obj[field]
53
54 def find_defaults(d, op):
55     if isinstance(d, list):
56         for i in d:
57             find_defaults(i, op)
58     elif isinstance(d, dict):
59         if "default" in d:
60             op(d)
61         else:
62             for i in d.itervalues():
63                 find_defaults(i, op)
64
65 def upload_dependencies(arvrunner, name, document_loader,
66                         workflowobj, uri, loadref_run, include_primary=True):
67     """Upload the dependencies of the workflowobj document to Keep.
68
69     Returns a pathmapper object mapping local paths to keep references.  Also
70     does an in-place update of references in "workflowobj".
71
72     Use scandeps to find $import, $include, $schemas, run, File and Directory
73     fields that represent external references.
74
75     If workflowobj has an "id" field, this will reload the document to ensure
76     it is scanning the raw document prior to preprocessing.
77     """
78
79     loaded = set()
80     def loadref(b, u):
81         joined = document_loader.fetcher.urljoin(b, u)
82         defrg, _ = urlparse.urldefrag(joined)
83         if defrg not in loaded:
84             loaded.add(defrg)
85             # Use fetch_text to get raw file (before preprocessing).
86             text = document_loader.fetch_text(defrg)
87             if isinstance(text, bytes):
88                 textIO = StringIO(text.decode('utf-8'))
89             else:
90                 textIO = StringIO(text)
91             return yaml.safe_load(textIO)
92         else:
93             return {}
94
95     if loadref_run:
96         loadref_fields = set(("$import", "run"))
97     else:
98         loadref_fields = set(("$import",))
99
100     scanobj = workflowobj
101     if "id" in workflowobj:
102         # Need raw file content (before preprocessing) to ensure
103         # that external references in $include and $mixin are captured.
104         scanobj = loadref("", workflowobj["id"])
105
106     sc = scandeps(uri, scanobj,
107                   loadref_fields,
108                   set(("$include", "$schemas", "location")),
109                   loadref, urljoin=document_loader.fetcher.urljoin)
110
111     normalizeFilesDirs(sc)
112
113     if include_primary and "id" in workflowobj:
114         sc.append({"class": "File", "location": workflowobj["id"]})
115
116     if "$schemas" in workflowobj:
117         for s in workflowobj["$schemas"]:
118             sc.append({"class": "File", "location": s})
119
120     def capture_default(obj):
121         remove = [False]
122         def add_default(f):
123             if "location" not in f and "path" in f:
124                 f["location"] = f["path"]
125                 del f["path"]
126             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
127                 # Remove from sc
128                 sc[:] = [x for x in sc if x["location"] != f["location"]]
129                 # Delete "default" from workflowobj
130                 remove[0] = True
131         visit_class(obj["default"], ("File", "Directory"), add_default)
132         if remove[0]:
133             del obj["default"]
134
135     find_defaults(workflowobj, capture_default)
136
137     mapper = ArvPathMapper(arvrunner, sc, "",
138                            "keep:%s",
139                            "keep:%s/%s",
140                            name=name,
141                            single_collection=True)
142
143     def setloc(p):
144         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
145             p["location"] = mapper.mapper(p["location"]).resolved
146     adjustFileObjs(workflowobj, setloc)
147     adjustDirObjs(workflowobj, setloc)
148
149     if "$schemas" in workflowobj:
150         sch = []
151         for s in workflowobj["$schemas"]:
152             sch.append(mapper.mapper(s).resolved)
153         workflowobj["$schemas"] = sch
154
155     return mapper
156
157
158 def upload_docker(arvrunner, tool):
159     """Uploads Docker images used in CommandLineTool objects."""
160
161     if isinstance(tool, CommandLineTool):
162         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
163         if docker_req:
164             if docker_req.get("dockerOutputDirectory"):
165                 # TODO: can be supported by containers API, but not jobs API.
166                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
167                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
168             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
169         else:
170             arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
171     elif isinstance(tool, cwltool.workflow.Workflow):
172         for s in tool.steps:
173             upload_docker(arvrunner, s.embedded_tool)
174
175 def packed_workflow(arvrunner, tool):
176     """Create a packed workflow.
177
178     A "packed" workflow is one where all the components have been combined into a single document."""
179
180     return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
181                 tool.tool["id"], tool.metadata)
182
183 def tag_git_version(packed):
184     if tool.tool["id"].startswith("file://"):
185         path = os.path.dirname(tool.tool["id"][7:])
186         try:
187             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
188         except (OSError, subprocess.CalledProcessError):
189             pass
190         else:
191             packed["http://schema.org/version"] = githash
192
193
194 def upload_job_order(arvrunner, name, tool, job_order):
195     """Upload local files referenced in the input object and return updated input
196     object with 'location' updated to the proper keep references.
197     """
198
199     for t in tool.tool["inputs"]:
200         def setSecondary(fileobj):
201             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
202                 if "secondaryFiles" not in fileobj:
203                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
204
205             if isinstance(fileobj, list):
206                 for e in fileobj:
207                     setSecondary(e)
208
209         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
210             setSecondary(job_order[shortname(t["id"])])
211
212     jobmapper = upload_dependencies(arvrunner,
213                                     name,
214                                     tool.doc_loader,
215                                     job_order,
216                                     job_order.get("id", "#"),
217                                     False)
218
219     if "id" in job_order:
220         del job_order["id"]
221
222     # Need to filter this out, gets added by cwltool when providing
223     # parameters on the command line.
224     if "job_order" in job_order:
225         del job_order["job_order"]
226
227     return job_order
228
229 def upload_workflow_deps(arvrunner, tool, override_tools):
230     # Ensure that Docker images needed by this workflow are available
231
232     upload_docker(arvrunner, tool)
233
234     document_loader = tool.doc_loader
235
236     def upload_tool_deps(deptool):
237         if "id" in deptool:
238             upload_dependencies(arvrunner,
239                                 "%s dependencies" % (shortname(deptool["id"])),
240                                 document_loader,
241                                 deptool,
242                                 deptool["id"],
243                                 False,
244                                 include_primary=False)
245             document_loader.idx[deptool["id"]] = deptool
246             override_tools[deptool["id"]] = json.dumps(deptool)
247
248     tool.visit(upload_tool_deps)
249
250 def arvados_jobs_image(arvrunner, img):
251     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
252
253     try:
254         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
255     except Exception as e:
256         raise Exception("Docker image %s is not available\n%s" % (img, e) )
257     return img
258
259 def upload_workflow_collection(arvrunner, name, packed):
260     collection = arvados.collection.Collection(api_client=arvrunner.api,
261                                                keep_client=arvrunner.keep_client,
262                                                num_retries=arvrunner.num_retries)
263     with collection.open("workflow.cwl", "w") as f:
264         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
265
266     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
267                ["name", "like", name+"%"]]
268     if arvrunner.project_uuid:
269         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
270     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
271
272     if exists["items"]:
273         logger.info("Using collection %s", exists["items"][0]["uuid"])
274     else:
275         collection.save_new(name=name,
276                             owner_uuid=arvrunner.project_uuid,
277                             ensure_unique_name=True,
278                             num_retries=arvrunner.num_retries)
279         logger.info("Uploaded to %s", collection.manifest_locator())
280
281     return collection.portable_data_hash()
282
283
284 class Runner(object):
285     """Base class for runner processes, which submit an instance of
286     arvados-cwl-runner and wait for the final result."""
287
288     def __init__(self, runner, tool, job_order, enable_reuse,
289                  output_name, output_tags, submit_runner_ram=0,
290                  name=None, on_error=None, submit_runner_image=None,
291                  intermediate_output_ttl=0):
292         self.arvrunner = runner
293         self.tool = tool
294         self.job_order = job_order
295         self.running = False
296         self.enable_reuse = enable_reuse
297         self.uuid = None
298         self.final_output = None
299         self.output_name = output_name
300         self.output_tags = output_tags
301         self.name = name
302         self.on_error = on_error
303         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
304         self.intermediate_output_ttl = intermediate_output_ttl
305
306         if submit_runner_ram:
307             self.submit_runner_ram = submit_runner_ram
308         else:
309             self.submit_runner_ram = 3000
310
311         if self.submit_runner_ram <= 0:
312             raise Exception("Value of --submit-runner-ram must be greater than zero")
313
314     def update_pipeline_component(self, record):
315         pass
316
317     def done(self, record):
318         """Base method for handling a completed runner."""
319
320         try:
321             if record["state"] == "Complete":
322                 if record.get("exit_code") is not None:
323                     if record["exit_code"] == 33:
324                         processStatus = "UnsupportedRequirement"
325                     elif record["exit_code"] == 0:
326                         processStatus = "success"
327                     else:
328                         processStatus = "permanentFail"
329                 else:
330                     processStatus = "success"
331             else:
332                 processStatus = "permanentFail"
333
334             outputs = {}
335
336             if processStatus == "permanentFail":
337                 logc = arvados.collection.CollectionReader(record["log"],
338                                                            api_client=self.arvrunner.api,
339                                                            keep_client=self.arvrunner.keep_client,
340                                                            num_retries=self.arvrunner.num_retries)
341                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
342
343             self.final_output = record["output"]
344             outc = arvados.collection.CollectionReader(self.final_output,
345                                                        api_client=self.arvrunner.api,
346                                                        keep_client=self.arvrunner.keep_client,
347                                                        num_retries=self.arvrunner.num_retries)
348             if "cwl.output.json" in outc:
349                 with outc.open("cwl.output.json") as f:
350                     if f.size() > 0:
351                         outputs = json.load(f)
352             def keepify(fileobj):
353                 path = fileobj["location"]
354                 if not path.startswith("keep:"):
355                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
356             adjustFileObjs(outputs, keepify)
357             adjustDirObjs(outputs, keepify)
358         except Exception as e:
359             logger.exception("[%s] While getting final output object: %s", self.name, e)
360             self.arvrunner.output_callback({}, "permanentFail")
361         else:
362             self.arvrunner.output_callback(outputs, processStatus)
363         finally:
364             if record["uuid"] in self.arvrunner.processes:
365                 del self.arvrunner.processes[record["uuid"]]