Merge branch '12061-flaky-tests'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess
11 from collections import namedtuple
12
13 from StringIO import StringIO
14
15 from schema_salad.sourceline import SourceLine, cmap
16
17 from cwltool.command_line_tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49
50 def remove_redundant_fields(obj):
51     for field in ("path", "nameext", "nameroot", "dirname"):
52         if field in obj:
53             del obj[field]
54
55
56 def find_defaults(d, op):
57     if isinstance(d, list):
58         for i in d:
59             find_defaults(i, op)
60     elif isinstance(d, dict):
61         if "default" in d:
62             op(d)
63         else:
64             for i in d.itervalues():
65                 find_defaults(i, op)
66
67 def setSecondary(t, fileobj, discovered):
68     if isinstance(fileobj, dict) and fileobj.get("class") == "File":
69         if "secondaryFiles" not in fileobj:
70             fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
71             if discovered is not None:
72                 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
73     elif isinstance(fileobj, list):
74         for e in fileobj:
75             setSecondary(t, e, discovered)
76
77 def discover_secondary_files(inputs, job_order, discovered=None):
78     for t in inputs:
79         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
80             setSecondary(t, job_order[shortname(t["id"])], discovered)
81
82
83 def upload_dependencies(arvrunner, name, document_loader,
84                         workflowobj, uri, loadref_run,
85                         include_primary=True, discovered_secondaryfiles=None):
86     """Upload the dependencies of the workflowobj document to Keep.
87
88     Returns a pathmapper object mapping local paths to keep references.  Also
89     does an in-place update of references in "workflowobj".
90
91     Use scandeps to find $import, $include, $schemas, run, File and Directory
92     fields that represent external references.
93
94     If workflowobj has an "id" field, this will reload the document to ensure
95     it is scanning the raw document prior to preprocessing.
96     """
97
98     loaded = set()
99     def loadref(b, u):
100         joined = document_loader.fetcher.urljoin(b, u)
101         defrg, _ = urlparse.urldefrag(joined)
102         if defrg not in loaded:
103             loaded.add(defrg)
104             # Use fetch_text to get raw file (before preprocessing).
105             text = document_loader.fetch_text(defrg)
106             if isinstance(text, bytes):
107                 textIO = StringIO(text.decode('utf-8'))
108             else:
109                 textIO = StringIO(text)
110             return yaml.safe_load(textIO)
111         else:
112             return {}
113
114     if loadref_run:
115         loadref_fields = set(("$import", "run"))
116     else:
117         loadref_fields = set(("$import",))
118
119     scanobj = workflowobj
120     if "id" in workflowobj:
121         # Need raw file content (before preprocessing) to ensure
122         # that external references in $include and $mixin are captured.
123         scanobj = loadref("", workflowobj["id"])
124
125     sc = scandeps(uri, scanobj,
126                   loadref_fields,
127                   set(("$include", "$schemas", "location")),
128                   loadref, urljoin=document_loader.fetcher.urljoin)
129
130     normalizeFilesDirs(sc)
131
132     if include_primary and "id" in workflowobj:
133         sc.append({"class": "File", "location": workflowobj["id"]})
134
135     if "$schemas" in workflowobj:
136         for s in workflowobj["$schemas"]:
137             sc.append({"class": "File", "location": s})
138
139     def visit_default(obj):
140         remove = [False]
141         def ensure_default_location(f):
142             if "location" not in f and "path" in f:
143                 f["location"] = f["path"]
144                 del f["path"]
145             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
146                 # Doesn't exist, remove from list of dependencies to upload
147                 sc[:] = [x for x in sc if x["location"] != f["location"]]
148                 # Delete "default" from workflowobj
149                 remove[0] = True
150         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
151         if remove[0]:
152             del obj["default"]
153
154     find_defaults(workflowobj, visit_default)
155
156     discovered = {}
157     def discover_default_secondary_files(obj):
158         discover_secondary_files(obj["inputs"],
159                                  {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
160                                  discovered)
161
162     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
163
164     for d in discovered:
165         sc.extend(discovered[d])
166
167     mapper = ArvPathMapper(arvrunner, sc, "",
168                            "keep:%s",
169                            "keep:%s/%s",
170                            name=name,
171                            single_collection=True)
172
173     def setloc(p):
174         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
175             p["location"] = mapper.mapper(p["location"]).resolved
176
177     visit_class(workflowobj, ("File", "Directory"), setloc)
178     visit_class(discovered, ("File", "Directory"), setloc)
179
180     if discovered_secondaryfiles is not None:
181         for d in discovered:
182             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
183
184     if "$schemas" in workflowobj:
185         sch = []
186         for s in workflowobj["$schemas"]:
187             sch.append(mapper.mapper(s).resolved)
188         workflowobj["$schemas"] = sch
189
190     return mapper
191
192
193 def upload_docker(arvrunner, tool):
194     """Uploads Docker images used in CommandLineTool objects."""
195
196     if isinstance(tool, CommandLineTool):
197         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
198         if docker_req:
199             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
200                 # TODO: can be supported by containers API, but not jobs API.
201                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
202                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
203             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
204         else:
205             arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
206     elif isinstance(tool, cwltool.workflow.Workflow):
207         for s in tool.steps:
208             upload_docker(arvrunner, s.embedded_tool)
209
210
211 def packed_workflow(arvrunner, tool, merged_map):
212     """Create a packed workflow.
213
214     A "packed" workflow is one where all the components have been combined into a single document."""
215
216     rewrites = {}
217     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
218                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
219
220     rewrite_to_orig = {v: k for k,v in rewrites.items()}
221
222     def visit(v, cur_id):
223         if isinstance(v, dict):
224             if v.get("class") in ("CommandLineTool", "Workflow"):
225                 if "id" not in v:
226                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
227                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
228             if "location" in v and not v["location"].startswith("keep:"):
229                 v["location"] = merged_map[cur_id].resolved[v["location"]]
230             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
231                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
232             for l in v:
233                 visit(v[l], cur_id)
234         if isinstance(v, list):
235             for l in v:
236                 visit(l, cur_id)
237     visit(packed, None)
238     return packed
239
240
241 def tag_git_version(packed):
242     if tool.tool["id"].startswith("file://"):
243         path = os.path.dirname(tool.tool["id"][7:])
244         try:
245             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
246         except (OSError, subprocess.CalledProcessError):
247             pass
248         else:
249             packed["http://schema.org/version"] = githash
250
251
252 def upload_job_order(arvrunner, name, tool, job_order):
253     """Upload local files referenced in the input object and return updated input
254     object with 'location' updated to the proper keep references.
255     """
256
257     discover_secondary_files(tool.tool["inputs"], job_order)
258
259     jobmapper = upload_dependencies(arvrunner,
260                                     name,
261                                     tool.doc_loader,
262                                     job_order,
263                                     job_order.get("id", "#"),
264                                     False)
265
266     if "id" in job_order:
267         del job_order["id"]
268
269     # Need to filter this out, gets added by cwltool when providing
270     # parameters on the command line.
271     if "job_order" in job_order:
272         del job_order["job_order"]
273
274     return job_order
275
276 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
277
278 def upload_workflow_deps(arvrunner, tool):
279     # Ensure that Docker images needed by this workflow are available
280
281     upload_docker(arvrunner, tool)
282
283     document_loader = tool.doc_loader
284
285     merged_map = {}
286
287     def upload_tool_deps(deptool):
288         if "id" in deptool:
289             discovered_secondaryfiles = {}
290             pm = upload_dependencies(arvrunner,
291                                      "%s dependencies" % (shortname(deptool["id"])),
292                                      document_loader,
293                                      deptool,
294                                      deptool["id"],
295                                      False,
296                                      include_primary=False,
297                                      discovered_secondaryfiles=discovered_secondaryfiles)
298             document_loader.idx[deptool["id"]] = deptool
299             toolmap = {}
300             for k,v in pm.items():
301                 toolmap[k] = v.resolved
302             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
303
304     tool.visit(upload_tool_deps)
305
306     return merged_map
307
308 def arvados_jobs_image(arvrunner, img):
309     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
310
311     try:
312         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
313     except Exception as e:
314         raise Exception("Docker image %s is not available\n%s" % (img, e) )
315     return img
316
317 def upload_workflow_collection(arvrunner, name, packed):
318     collection = arvados.collection.Collection(api_client=arvrunner.api,
319                                                keep_client=arvrunner.keep_client,
320                                                num_retries=arvrunner.num_retries)
321     with collection.open("workflow.cwl", "w") as f:
322         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
323
324     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
325                ["name", "like", name+"%"]]
326     if arvrunner.project_uuid:
327         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
328     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
329
330     if exists["items"]:
331         logger.info("Using collection %s", exists["items"][0]["uuid"])
332     else:
333         collection.save_new(name=name,
334                             owner_uuid=arvrunner.project_uuid,
335                             ensure_unique_name=True,
336                             num_retries=arvrunner.num_retries)
337         logger.info("Uploaded to %s", collection.manifest_locator())
338
339     return collection.portable_data_hash()
340
341
342 class Runner(object):
343     """Base class for runner processes, which submit an instance of
344     arvados-cwl-runner and wait for the final result."""
345
346     def __init__(self, runner, tool, job_order, enable_reuse,
347                  output_name, output_tags, submit_runner_ram=0,
348                  name=None, on_error=None, submit_runner_image=None,
349                  intermediate_output_ttl=0, merged_map=None, priority=None,
350                  secret_store=None):
351         self.arvrunner = runner
352         self.tool = tool
353         self.job_order = job_order
354         self.running = False
355         if enable_reuse:
356             # If reuse is permitted by command line arguments but
357             # disabled by the workflow itself, disable it.
358             reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
359             if reuse_req:
360                 enable_reuse = reuse_req["enableReuse"]
361         self.enable_reuse = enable_reuse
362         self.uuid = None
363         self.final_output = None
364         self.output_name = output_name
365         self.output_tags = output_tags
366         self.name = name
367         self.on_error = on_error
368         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
369         self.intermediate_output_ttl = intermediate_output_ttl
370         self.priority = priority
371         self.secret_store = secret_store
372
373         if submit_runner_ram:
374             self.submit_runner_ram = submit_runner_ram
375         else:
376             self.submit_runner_ram = 3000
377
378         if self.submit_runner_ram <= 0:
379             raise Exception("Value of --submit-runner-ram must be greater than zero")
380
381         self.merged_map = merged_map or {}
382
383     def update_pipeline_component(self, record):
384         pass
385
386     def done(self, record):
387         """Base method for handling a completed runner."""
388
389         try:
390             if record["state"] == "Complete":
391                 if record.get("exit_code") is not None:
392                     if record["exit_code"] == 33:
393                         processStatus = "UnsupportedRequirement"
394                     elif record["exit_code"] == 0:
395                         processStatus = "success"
396                     else:
397                         processStatus = "permanentFail"
398                 else:
399                     processStatus = "success"
400             else:
401                 processStatus = "permanentFail"
402
403             outputs = {}
404
405             if processStatus == "permanentFail":
406                 logc = arvados.collection.CollectionReader(record["log"],
407                                                            api_client=self.arvrunner.api,
408                                                            keep_client=self.arvrunner.keep_client,
409                                                            num_retries=self.arvrunner.num_retries)
410                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
411
412             self.final_output = record["output"]
413             outc = arvados.collection.CollectionReader(self.final_output,
414                                                        api_client=self.arvrunner.api,
415                                                        keep_client=self.arvrunner.keep_client,
416                                                        num_retries=self.arvrunner.num_retries)
417             if "cwl.output.json" in outc:
418                 with outc.open("cwl.output.json") as f:
419                     if f.size() > 0:
420                         outputs = json.load(f)
421             def keepify(fileobj):
422                 path = fileobj["location"]
423                 if not path.startswith("keep:"):
424                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
425             adjustFileObjs(outputs, keepify)
426             adjustDirObjs(outputs, keepify)
427         except Exception as e:
428             logger.exception("[%s] While getting final output object: %s", self.name, e)
429             self.arvrunner.output_callback({}, "permanentFail")
430         else:
431             self.arvrunner.output_callback(outputs, processStatus)