Merge branch '13619-fed-object-list' closes #13619
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess32 as subprocess
11 from collections import namedtuple
12
13 from StringIO import StringIO
14
15 from schema_salad.sourceline import SourceLine, cmap
16
17 from cwltool.command_line_tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49
50 def remove_redundant_fields(obj):
51     for field in ("path", "nameext", "nameroot", "dirname"):
52         if field in obj:
53             del obj[field]
54
55
56 def find_defaults(d, op):
57     if isinstance(d, list):
58         for i in d:
59             find_defaults(i, op)
60     elif isinstance(d, dict):
61         if "default" in d:
62             op(d)
63         else:
64             for i in d.itervalues():
65                 find_defaults(i, op)
66
67 def setSecondary(t, fileobj, discovered):
68     if isinstance(fileobj, dict) and fileobj.get("class") == "File":
69         if "secondaryFiles" not in fileobj:
70             fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
71             if discovered is not None:
72                 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
73     elif isinstance(fileobj, list):
74         for e in fileobj:
75             setSecondary(t, e, discovered)
76
77 def discover_secondary_files(inputs, job_order, discovered=None):
78     for t in inputs:
79         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
80             setSecondary(t, job_order[shortname(t["id"])], discovered)
81
82
83 def upload_dependencies(arvrunner, name, document_loader,
84                         workflowobj, uri, loadref_run,
85                         include_primary=True, discovered_secondaryfiles=None):
86     """Upload the dependencies of the workflowobj document to Keep.
87
88     Returns a pathmapper object mapping local paths to keep references.  Also
89     does an in-place update of references in "workflowobj".
90
91     Use scandeps to find $import, $include, $schemas, run, File and Directory
92     fields that represent external references.
93
94     If workflowobj has an "id" field, this will reload the document to ensure
95     it is scanning the raw document prior to preprocessing.
96     """
97
98     loaded = set()
99     def loadref(b, u):
100         joined = document_loader.fetcher.urljoin(b, u)
101         defrg, _ = urlparse.urldefrag(joined)
102         if defrg not in loaded:
103             loaded.add(defrg)
104             # Use fetch_text to get raw file (before preprocessing).
105             text = document_loader.fetch_text(defrg)
106             if isinstance(text, bytes):
107                 textIO = StringIO(text.decode('utf-8'))
108             else:
109                 textIO = StringIO(text)
110             return yaml.safe_load(textIO)
111         else:
112             return {}
113
114     if loadref_run:
115         loadref_fields = set(("$import", "run"))
116     else:
117         loadref_fields = set(("$import",))
118
119     scanobj = workflowobj
120     if "id" in workflowobj:
121         # Need raw file content (before preprocessing) to ensure
122         # that external references in $include and $mixin are captured.
123         scanobj = loadref("", workflowobj["id"])
124
125     sc_result = scandeps(uri, scanobj,
126                   loadref_fields,
127                   set(("$include", "$schemas", "location")),
128                   loadref, urljoin=document_loader.fetcher.urljoin)
129
130     sc = []
131     def only_real(obj):
132         # Only interested in local files than need to be uploaded,
133         # don't include file literals, keep references, etc.
134         sp = obj.get("location", "").split(":")
135         if len(sp) > 1 and sp[0] in ("file", "http", "https"):
136             sc.append(obj)
137
138     visit_class(sc_result, ("File", "Directory"), only_real)
139
140     normalizeFilesDirs(sc)
141
142     if include_primary and "id" in workflowobj:
143         sc.append({"class": "File", "location": workflowobj["id"]})
144
145     if "$schemas" in workflowobj:
146         for s in workflowobj["$schemas"]:
147             sc.append({"class": "File", "location": s})
148
149     def visit_default(obj):
150         remove = [False]
151         def ensure_default_location(f):
152             if "location" not in f and "path" in f:
153                 f["location"] = f["path"]
154                 del f["path"]
155             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
156                 # Doesn't exist, remove from list of dependencies to upload
157                 sc[:] = [x for x in sc if x["location"] != f["location"]]
158                 # Delete "default" from workflowobj
159                 remove[0] = True
160         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
161         if remove[0]:
162             del obj["default"]
163
164     find_defaults(workflowobj, visit_default)
165
166     discovered = {}
167     def discover_default_secondary_files(obj):
168         discover_secondary_files(obj["inputs"],
169                                  {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
170                                  discovered)
171
172     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
173
174     for d in list(discovered.keys()):
175         # Only interested in discovered secondaryFiles which are local
176         # files that need to be uploaded.
177         if d.startswith("file:"):
178             sc.extend(discovered[d])
179         else:
180             del discovered[d]
181
182     mapper = ArvPathMapper(arvrunner, sc, "",
183                            "keep:%s",
184                            "keep:%s/%s",
185                            name=name,
186                            single_collection=True)
187
188     def setloc(p):
189         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
190             p["location"] = mapper.mapper(p["location"]).resolved
191
192     visit_class(workflowobj, ("File", "Directory"), setloc)
193     visit_class(discovered, ("File", "Directory"), setloc)
194
195     if discovered_secondaryfiles is not None:
196         for d in discovered:
197             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
198
199     if "$schemas" in workflowobj:
200         sch = []
201         for s in workflowobj["$schemas"]:
202             sch.append(mapper.mapper(s).resolved)
203         workflowobj["$schemas"] = sch
204
205     return mapper
206
207
208 def upload_docker(arvrunner, tool):
209     """Uploads Docker images used in CommandLineTool objects."""
210
211     if isinstance(tool, CommandLineTool):
212         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
213         if docker_req:
214             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
215                 # TODO: can be supported by containers API, but not jobs API.
216                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
217                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
218             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
219         else:
220             arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
221     elif isinstance(tool, cwltool.workflow.Workflow):
222         for s in tool.steps:
223             upload_docker(arvrunner, s.embedded_tool)
224
225
226 def packed_workflow(arvrunner, tool, merged_map):
227     """Create a packed workflow.
228
229     A "packed" workflow is one where all the components have been combined into a single document."""
230
231     rewrites = {}
232     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
233                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
234
235     rewrite_to_orig = {v: k for k,v in rewrites.items()}
236
237     def visit(v, cur_id):
238         if isinstance(v, dict):
239             if v.get("class") in ("CommandLineTool", "Workflow"):
240                 if "id" not in v:
241                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
242                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
243             if "location" in v and not v["location"].startswith("keep:"):
244                 v["location"] = merged_map[cur_id].resolved[v["location"]]
245             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
246                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
247             for l in v:
248                 visit(v[l], cur_id)
249         if isinstance(v, list):
250             for l in v:
251                 visit(l, cur_id)
252     visit(packed, None)
253     return packed
254
255
256 def tag_git_version(packed):
257     if tool.tool["id"].startswith("file://"):
258         path = os.path.dirname(tool.tool["id"][7:])
259         try:
260             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
261         except (OSError, subprocess.CalledProcessError):
262             pass
263         else:
264             packed["http://schema.org/version"] = githash
265
266
267 def upload_job_order(arvrunner, name, tool, job_order):
268     """Upload local files referenced in the input object and return updated input
269     object with 'location' updated to the proper keep references.
270     """
271
272     discover_secondary_files(tool.tool["inputs"], job_order)
273
274     jobmapper = upload_dependencies(arvrunner,
275                                     name,
276                                     tool.doc_loader,
277                                     job_order,
278                                     job_order.get("id", "#"),
279                                     False)
280
281     if "id" in job_order:
282         del job_order["id"]
283
284     # Need to filter this out, gets added by cwltool when providing
285     # parameters on the command line.
286     if "job_order" in job_order:
287         del job_order["job_order"]
288
289     return job_order
290
291 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
292
293 def upload_workflow_deps(arvrunner, tool):
294     # Ensure that Docker images needed by this workflow are available
295
296     upload_docker(arvrunner, tool)
297
298     document_loader = tool.doc_loader
299
300     merged_map = {}
301
302     def upload_tool_deps(deptool):
303         if "id" in deptool:
304             discovered_secondaryfiles = {}
305             pm = upload_dependencies(arvrunner,
306                                      "%s dependencies" % (shortname(deptool["id"])),
307                                      document_loader,
308                                      deptool,
309                                      deptool["id"],
310                                      False,
311                                      include_primary=False,
312                                      discovered_secondaryfiles=discovered_secondaryfiles)
313             document_loader.idx[deptool["id"]] = deptool
314             toolmap = {}
315             for k,v in pm.items():
316                 toolmap[k] = v.resolved
317             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
318
319     tool.visit(upload_tool_deps)
320
321     return merged_map
322
323 def arvados_jobs_image(arvrunner, img):
324     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
325
326     try:
327         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
328     except Exception as e:
329         raise Exception("Docker image %s is not available\n%s" % (img, e) )
330     return img
331
332 def upload_workflow_collection(arvrunner, name, packed):
333     collection = arvados.collection.Collection(api_client=arvrunner.api,
334                                                keep_client=arvrunner.keep_client,
335                                                num_retries=arvrunner.num_retries)
336     with collection.open("workflow.cwl", "w") as f:
337         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
338
339     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
340                ["name", "like", name+"%"]]
341     if arvrunner.project_uuid:
342         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
343     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
344
345     if exists["items"]:
346         logger.info("Using collection %s", exists["items"][0]["uuid"])
347     else:
348         collection.save_new(name=name,
349                             owner_uuid=arvrunner.project_uuid,
350                             ensure_unique_name=True,
351                             num_retries=arvrunner.num_retries)
352         logger.info("Uploaded to %s", collection.manifest_locator())
353
354     return collection.portable_data_hash()
355
356
357 class Runner(object):
358     """Base class for runner processes, which submit an instance of
359     arvados-cwl-runner and wait for the final result."""
360
361     def __init__(self, runner, tool, job_order, enable_reuse,
362                  output_name, output_tags, submit_runner_ram=0,
363                  name=None, on_error=None, submit_runner_image=None,
364                  intermediate_output_ttl=0, merged_map=None,
365                  priority=None, secret_store=None):
366         self.arvrunner = runner
367         self.tool = tool
368         self.job_order = job_order
369         self.running = False
370         if enable_reuse:
371             # If reuse is permitted by command line arguments but
372             # disabled by the workflow itself, disable it.
373             reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
374             if reuse_req:
375                 enable_reuse = reuse_req["enableReuse"]
376         self.enable_reuse = enable_reuse
377         self.uuid = None
378         self.final_output = None
379         self.output_name = output_name
380         self.output_tags = output_tags
381         self.name = name
382         self.on_error = on_error
383         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
384         self.intermediate_output_ttl = intermediate_output_ttl
385         self.priority = priority
386         self.secret_store = secret_store
387
388         self.submit_runner_cores = 1
389         self.submit_runner_ram = 1024  # defaut 1 GiB
390
391         runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
392         if runner_resource_req:
393             if runner_resource_req.get("coresMin"):
394                 self.submit_runner_cores = runner_resource_req["coresMin"]
395             if runner_resource_req.get("ramMin"):
396                 self.submit_runner_ram = runner_resource_req["ramMin"]
397
398         if submit_runner_ram:
399             # Command line / initializer overrides default and/or spec from workflow
400             self.submit_runner_ram = submit_runner_ram
401
402         if self.submit_runner_ram <= 0:
403             raise Exception("Value of submit-runner-ram must be greater than zero")
404
405         if self.submit_runner_cores <= 0:
406             raise Exception("Value of submit-runner-cores must be greater than zero")
407
408         self.merged_map = merged_map or {}
409
410     def update_pipeline_component(self, record):
411         pass
412
413     def done(self, record):
414         """Base method for handling a completed runner."""
415
416         try:
417             if record["state"] == "Complete":
418                 if record.get("exit_code") is not None:
419                     if record["exit_code"] == 33:
420                         processStatus = "UnsupportedRequirement"
421                     elif record["exit_code"] == 0:
422                         processStatus = "success"
423                     else:
424                         processStatus = "permanentFail"
425                 else:
426                     processStatus = "success"
427             else:
428                 processStatus = "permanentFail"
429
430             outputs = {}
431
432             if processStatus == "permanentFail":
433                 logc = arvados.collection.CollectionReader(record["log"],
434                                                            api_client=self.arvrunner.api,
435                                                            keep_client=self.arvrunner.keep_client,
436                                                            num_retries=self.arvrunner.num_retries)
437                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
438
439             self.final_output = record["output"]
440             outc = arvados.collection.CollectionReader(self.final_output,
441                                                        api_client=self.arvrunner.api,
442                                                        keep_client=self.arvrunner.keep_client,
443                                                        num_retries=self.arvrunner.num_retries)
444             if "cwl.output.json" in outc:
445                 with outc.open("cwl.output.json") as f:
446                     if f.size() > 0:
447                         outputs = json.load(f)
448             def keepify(fileobj):
449                 path = fileobj["location"]
450                 if not path.startswith("keep:"):
451                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
452             adjustFileObjs(outputs, keepify)
453             adjustDirObjs(outputs, keepify)
454         except Exception as e:
455             logger.exception("[%s] While getting final output object: %s", self.name, e)
456             self.arvrunner.output_callback({}, "permanentFail")
457         else:
458             self.arvrunner.output_callback(outputs, processStatus)