Merge branch '14198-fed-testing' refs #14198
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess32 as subprocess
11 from collections import namedtuple
12
13 from StringIO import StringIO
14
15 from schema_salad.sourceline import SourceLine, cmap
16
17 from cwltool.command_line_tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 import arvados_cwl.arvdocker
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49
50 def remove_redundant_fields(obj):
51     for field in ("path", "nameext", "nameroot", "dirname"):
52         if field in obj:
53             del obj[field]
54
55
56 def find_defaults(d, op):
57     if isinstance(d, list):
58         for i in d:
59             find_defaults(i, op)
60     elif isinstance(d, dict):
61         if "default" in d:
62             op(d)
63         else:
64             for i in d.itervalues():
65                 find_defaults(i, op)
66
67 def setSecondary(t, fileobj, discovered):
68     if isinstance(fileobj, dict) and fileobj.get("class") == "File":
69         if "secondaryFiles" not in fileobj:
70             fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
71             if discovered is not None:
72                 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
73     elif isinstance(fileobj, list):
74         for e in fileobj:
75             setSecondary(t, e, discovered)
76
77 def discover_secondary_files(inputs, job_order, discovered=None):
78     for t in inputs:
79         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
80             setSecondary(t, job_order[shortname(t["id"])], discovered)
81
82
83 def upload_dependencies(arvrunner, name, document_loader,
84                         workflowobj, uri, loadref_run,
85                         include_primary=True, discovered_secondaryfiles=None):
86     """Upload the dependencies of the workflowobj document to Keep.
87
88     Returns a pathmapper object mapping local paths to keep references.  Also
89     does an in-place update of references in "workflowobj".
90
91     Use scandeps to find $import, $include, $schemas, run, File and Directory
92     fields that represent external references.
93
94     If workflowobj has an "id" field, this will reload the document to ensure
95     it is scanning the raw document prior to preprocessing.
96     """
97
98     loaded = set()
99     def loadref(b, u):
100         joined = document_loader.fetcher.urljoin(b, u)
101         defrg, _ = urlparse.urldefrag(joined)
102         if defrg not in loaded:
103             loaded.add(defrg)
104             # Use fetch_text to get raw file (before preprocessing).
105             text = document_loader.fetch_text(defrg)
106             if isinstance(text, bytes):
107                 textIO = StringIO(text.decode('utf-8'))
108             else:
109                 textIO = StringIO(text)
110             return yaml.safe_load(textIO)
111         else:
112             return {}
113
114     if loadref_run:
115         loadref_fields = set(("$import", "run"))
116     else:
117         loadref_fields = set(("$import",))
118
119     scanobj = workflowobj
120     if "id" in workflowobj:
121         # Need raw file content (before preprocessing) to ensure
122         # that external references in $include and $mixin are captured.
123         scanobj = loadref("", workflowobj["id"])
124
125     sc_result = scandeps(uri, scanobj,
126                   loadref_fields,
127                   set(("$include", "$schemas", "location")),
128                   loadref, urljoin=document_loader.fetcher.urljoin)
129
130     sc = []
131     def only_real(obj):
132         # Only interested in local files than need to be uploaded,
133         # don't include file literals, keep references, etc.
134         sp = obj.get("location", "").split(":")
135         if len(sp) > 1 and sp[0] in ("file", "http", "https"):
136             sc.append(obj)
137
138     visit_class(sc_result, ("File", "Directory"), only_real)
139
140     normalizeFilesDirs(sc)
141
142     if include_primary and "id" in workflowobj:
143         sc.append({"class": "File", "location": workflowobj["id"]})
144
145     if "$schemas" in workflowobj:
146         for s in workflowobj["$schemas"]:
147             sc.append({"class": "File", "location": s})
148
149     def visit_default(obj):
150         remove = [False]
151         def ensure_default_location(f):
152             if "location" not in f and "path" in f:
153                 f["location"] = f["path"]
154                 del f["path"]
155             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
156                 # Doesn't exist, remove from list of dependencies to upload
157                 sc[:] = [x for x in sc if x["location"] != f["location"]]
158                 # Delete "default" from workflowobj
159                 remove[0] = True
160         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
161         if remove[0]:
162             del obj["default"]
163
164     find_defaults(workflowobj, visit_default)
165
166     discovered = {}
167     def discover_default_secondary_files(obj):
168         discover_secondary_files(obj["inputs"],
169                                  {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
170                                  discovered)
171
172     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
173
174     for d in list(discovered.keys()):
175         # Only interested in discovered secondaryFiles which are local
176         # files that need to be uploaded.
177         if d.startswith("file:"):
178             sc.extend(discovered[d])
179         else:
180             del discovered[d]
181
182     mapper = ArvPathMapper(arvrunner, sc, "",
183                            "keep:%s",
184                            "keep:%s/%s",
185                            name=name,
186                            single_collection=True)
187
188     def setloc(p):
189         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
190             p["location"] = mapper.mapper(p["location"]).resolved
191
192     visit_class(workflowobj, ("File", "Directory"), setloc)
193     visit_class(discovered, ("File", "Directory"), setloc)
194
195     if discovered_secondaryfiles is not None:
196         for d in discovered:
197             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
198
199     if "$schemas" in workflowobj:
200         sch = []
201         for s in workflowobj["$schemas"]:
202             sch.append(mapper.mapper(s).resolved)
203         workflowobj["$schemas"] = sch
204
205     return mapper
206
207
208 def upload_docker(arvrunner, tool):
209     """Uploads Docker images used in CommandLineTool objects."""
210
211     if isinstance(tool, CommandLineTool):
212         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
213         if docker_req:
214             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
215                 # TODO: can be supported by containers API, but not jobs API.
216                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
217                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
218             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
219         else:
220             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
221     elif isinstance(tool, cwltool.workflow.Workflow):
222         for s in tool.steps:
223             upload_docker(arvrunner, s.embedded_tool)
224
225
226 def packed_workflow(arvrunner, tool, merged_map):
227     """Create a packed workflow.
228
229     A "packed" workflow is one where all the components have been combined into a single document."""
230
231     rewrites = {}
232     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
233                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
234
235     rewrite_to_orig = {v: k for k,v in rewrites.items()}
236
237     def visit(v, cur_id):
238         if isinstance(v, dict):
239             if v.get("class") in ("CommandLineTool", "Workflow"):
240                 if "id" not in v:
241                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
242                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
243             if "location" in v and not v["location"].startswith("keep:"):
244                 v["location"] = merged_map[cur_id].resolved[v["location"]]
245             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
246                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
247             if v.get("class") == "DockerRequirement":
248                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
249             for l in v:
250                 visit(v[l], cur_id)
251         if isinstance(v, list):
252             for l in v:
253                 visit(l, cur_id)
254     visit(packed, None)
255     return packed
256
257
258 def tag_git_version(packed):
259     if tool.tool["id"].startswith("file://"):
260         path = os.path.dirname(tool.tool["id"][7:])
261         try:
262             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
263         except (OSError, subprocess.CalledProcessError):
264             pass
265         else:
266             packed["http://schema.org/version"] = githash
267
268
269 def upload_job_order(arvrunner, name, tool, job_order):
270     """Upload local files referenced in the input object and return updated input
271     object with 'location' updated to the proper keep references.
272     """
273
274     discover_secondary_files(tool.tool["inputs"], job_order)
275
276     jobmapper = upload_dependencies(arvrunner,
277                                     name,
278                                     tool.doc_loader,
279                                     job_order,
280                                     job_order.get("id", "#"),
281                                     False)
282
283     if "id" in job_order:
284         del job_order["id"]
285
286     # Need to filter this out, gets added by cwltool when providing
287     # parameters on the command line.
288     if "job_order" in job_order:
289         del job_order["job_order"]
290
291     return job_order
292
293 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
294
295 def upload_workflow_deps(arvrunner, tool):
296     # Ensure that Docker images needed by this workflow are available
297
298     upload_docker(arvrunner, tool)
299
300     document_loader = tool.doc_loader
301
302     merged_map = {}
303
304     def upload_tool_deps(deptool):
305         if "id" in deptool:
306             discovered_secondaryfiles = {}
307             pm = upload_dependencies(arvrunner,
308                                      "%s dependencies" % (shortname(deptool["id"])),
309                                      document_loader,
310                                      deptool,
311                                      deptool["id"],
312                                      False,
313                                      include_primary=False,
314                                      discovered_secondaryfiles=discovered_secondaryfiles)
315             document_loader.idx[deptool["id"]] = deptool
316             toolmap = {}
317             for k,v in pm.items():
318                 toolmap[k] = v.resolved
319             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
320
321     tool.visit(upload_tool_deps)
322
323     return merged_map
324
325 def arvados_jobs_image(arvrunner, img):
326     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
327
328     try:
329         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
330     except Exception as e:
331         raise Exception("Docker image %s is not available\n%s" % (img, e) )
332
333
334 def upload_workflow_collection(arvrunner, name, packed):
335     collection = arvados.collection.Collection(api_client=arvrunner.api,
336                                                keep_client=arvrunner.keep_client,
337                                                num_retries=arvrunner.num_retries)
338     with collection.open("workflow.cwl", "w") as f:
339         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
340
341     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
342                ["name", "like", name+"%"]]
343     if arvrunner.project_uuid:
344         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
345     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
346
347     if exists["items"]:
348         logger.info("Using collection %s", exists["items"][0]["uuid"])
349     else:
350         collection.save_new(name=name,
351                             owner_uuid=arvrunner.project_uuid,
352                             ensure_unique_name=True,
353                             num_retries=arvrunner.num_retries)
354         logger.info("Uploaded to %s", collection.manifest_locator())
355
356     return collection.portable_data_hash()
357
358
359 class Runner(object):
360     """Base class for runner processes, which submit an instance of
361     arvados-cwl-runner and wait for the final result."""
362
363     def __init__(self, runner, tool, job_order, enable_reuse,
364                  output_name, output_tags, submit_runner_ram=0,
365                  name=None, on_error=None, submit_runner_image=None,
366                  intermediate_output_ttl=0, merged_map=None,
367                  priority=None, secret_store=None):
368         self.arvrunner = runner
369         self.tool = tool
370         self.job_order = job_order
371         self.running = False
372         if enable_reuse:
373             # If reuse is permitted by command line arguments but
374             # disabled by the workflow itself, disable it.
375             reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
376             if reuse_req:
377                 enable_reuse = reuse_req["enableReuse"]
378         self.enable_reuse = enable_reuse
379         self.uuid = None
380         self.final_output = None
381         self.output_name = output_name
382         self.output_tags = output_tags
383         self.name = name
384         self.on_error = on_error
385         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
386         self.intermediate_output_ttl = intermediate_output_ttl
387         self.priority = priority
388         self.secret_store = secret_store
389
390         self.submit_runner_cores = 1
391         self.submit_runner_ram = 1024  # defaut 1 GiB
392
393         runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
394         if runner_resource_req:
395             if runner_resource_req.get("coresMin"):
396                 self.submit_runner_cores = runner_resource_req["coresMin"]
397             if runner_resource_req.get("ramMin"):
398                 self.submit_runner_ram = runner_resource_req["ramMin"]
399
400         if submit_runner_ram:
401             # Command line / initializer overrides default and/or spec from workflow
402             self.submit_runner_ram = submit_runner_ram
403
404         if self.submit_runner_ram <= 0:
405             raise Exception("Value of submit-runner-ram must be greater than zero")
406
407         if self.submit_runner_cores <= 0:
408             raise Exception("Value of submit-runner-cores must be greater than zero")
409
410         self.merged_map = merged_map or {}
411
412     def update_pipeline_component(self, record):
413         pass
414
415     def done(self, record):
416         """Base method for handling a completed runner."""
417
418         try:
419             if record["state"] == "Complete":
420                 if record.get("exit_code") is not None:
421                     if record["exit_code"] == 33:
422                         processStatus = "UnsupportedRequirement"
423                     elif record["exit_code"] == 0:
424                         processStatus = "success"
425                     else:
426                         processStatus = "permanentFail"
427                 else:
428                     processStatus = "success"
429             else:
430                 processStatus = "permanentFail"
431
432             outputs = {}
433
434             if processStatus == "permanentFail":
435                 logc = arvados.collection.CollectionReader(record["log"],
436                                                            api_client=self.arvrunner.api,
437                                                            keep_client=self.arvrunner.keep_client,
438                                                            num_retries=self.arvrunner.num_retries)
439                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
440
441             self.final_output = record["output"]
442             outc = arvados.collection.CollectionReader(self.final_output,
443                                                        api_client=self.arvrunner.api,
444                                                        keep_client=self.arvrunner.keep_client,
445                                                        num_retries=self.arvrunner.num_retries)
446             if "cwl.output.json" in outc:
447                 with outc.open("cwl.output.json") as f:
448                     if f.size() > 0:
449                         outputs = json.load(f)
450             def keepify(fileobj):
451                 path = fileobj["location"]
452                 if not path.startswith("keep:"):
453                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
454             adjustFileObjs(outputs, keepify)
455             adjustDirObjs(outputs, keepify)
456         except Exception as e:
457             logger.exception("[%s] While getting final output object: %s", self.name, e)
458             self.arvrunner.output_callback({}, "permanentFail")
459         else:
460             self.arvrunner.output_callback(outputs, processStatus)