Merge branch 'master' into 13804-no-shutdown-wanted-nodes
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess32 as subprocess
11 from collections import namedtuple
12
13 from StringIO import StringIO
14
15 from schema_salad.sourceline import SourceLine, cmap
16
17 from cwltool.command_line_tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49
50 def remove_redundant_fields(obj):
51     for field in ("path", "nameext", "nameroot", "dirname"):
52         if field in obj:
53             del obj[field]
54
55
56 def find_defaults(d, op):
57     if isinstance(d, list):
58         for i in d:
59             find_defaults(i, op)
60     elif isinstance(d, dict):
61         if "default" in d:
62             op(d)
63         else:
64             for i in d.itervalues():
65                 find_defaults(i, op)
66
67 def setSecondary(t, fileobj, discovered):
68     if isinstance(fileobj, dict) and fileobj.get("class") == "File":
69         if "secondaryFiles" not in fileobj:
70             fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
71             if discovered is not None:
72                 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
73     elif isinstance(fileobj, list):
74         for e in fileobj:
75             setSecondary(t, e, discovered)
76
77 def discover_secondary_files(inputs, job_order, discovered=None):
78     for t in inputs:
79         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
80             setSecondary(t, job_order[shortname(t["id"])], discovered)
81
82
83 def upload_dependencies(arvrunner, name, document_loader,
84                         workflowobj, uri, loadref_run,
85                         include_primary=True, discovered_secondaryfiles=None):
86     """Upload the dependencies of the workflowobj document to Keep.
87
88     Returns a pathmapper object mapping local paths to keep references.  Also
89     does an in-place update of references in "workflowobj".
90
91     Use scandeps to find $import, $include, $schemas, run, File and Directory
92     fields that represent external references.
93
94     If workflowobj has an "id" field, this will reload the document to ensure
95     it is scanning the raw document prior to preprocessing.
96     """
97
98     loaded = set()
99     def loadref(b, u):
100         joined = document_loader.fetcher.urljoin(b, u)
101         defrg, _ = urlparse.urldefrag(joined)
102         if defrg not in loaded:
103             loaded.add(defrg)
104             # Use fetch_text to get raw file (before preprocessing).
105             text = document_loader.fetch_text(defrg)
106             if isinstance(text, bytes):
107                 textIO = StringIO(text.decode('utf-8'))
108             else:
109                 textIO = StringIO(text)
110             return yaml.safe_load(textIO)
111         else:
112             return {}
113
114     if loadref_run:
115         loadref_fields = set(("$import", "run"))
116     else:
117         loadref_fields = set(("$import",))
118
119     scanobj = workflowobj
120     if "id" in workflowobj:
121         # Need raw file content (before preprocessing) to ensure
122         # that external references in $include and $mixin are captured.
123         scanobj = loadref("", workflowobj["id"])
124
125     sc_result = scandeps(uri, scanobj,
126                   loadref_fields,
127                   set(("$include", "$schemas", "location")),
128                   loadref, urljoin=document_loader.fetcher.urljoin)
129
130     sc = []
131     def only_real(obj):
132         if obj.get("location", "").startswith("file:"):
133             sc.append(obj)
134
135     visit_class(sc_result, ("File", "Directory"), only_real)
136
137     normalizeFilesDirs(sc)
138
139     if include_primary and "id" in workflowobj:
140         sc.append({"class": "File", "location": workflowobj["id"]})
141
142     if "$schemas" in workflowobj:
143         for s in workflowobj["$schemas"]:
144             sc.append({"class": "File", "location": s})
145
146     def visit_default(obj):
147         remove = [False]
148         def ensure_default_location(f):
149             if "location" not in f and "path" in f:
150                 f["location"] = f["path"]
151                 del f["path"]
152             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
153                 # Doesn't exist, remove from list of dependencies to upload
154                 sc[:] = [x for x in sc if x["location"] != f["location"]]
155                 # Delete "default" from workflowobj
156                 remove[0] = True
157         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
158         if remove[0]:
159             del obj["default"]
160
161     find_defaults(workflowobj, visit_default)
162
163     discovered = {}
164     def discover_default_secondary_files(obj):
165         discover_secondary_files(obj["inputs"],
166                                  {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
167                                  discovered)
168
169     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
170
171     for d in discovered:
172         sc.extend(discovered[d])
173
174     mapper = ArvPathMapper(arvrunner, sc, "",
175                            "keep:%s",
176                            "keep:%s/%s",
177                            name=name,
178                            single_collection=True)
179
180     def setloc(p):
181         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
182             p["location"] = mapper.mapper(p["location"]).resolved
183
184     visit_class(workflowobj, ("File", "Directory"), setloc)
185     visit_class(discovered, ("File", "Directory"), setloc)
186
187     if discovered_secondaryfiles is not None:
188         for d in discovered:
189             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
190
191     if "$schemas" in workflowobj:
192         sch = []
193         for s in workflowobj["$schemas"]:
194             sch.append(mapper.mapper(s).resolved)
195         workflowobj["$schemas"] = sch
196
197     return mapper
198
199
200 def upload_docker(arvrunner, tool):
201     """Uploads Docker images used in CommandLineTool objects."""
202
203     if isinstance(tool, CommandLineTool):
204         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
205         if docker_req:
206             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
207                 # TODO: can be supported by containers API, but not jobs API.
208                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
209                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
210             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
211         else:
212             arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
213     elif isinstance(tool, cwltool.workflow.Workflow):
214         for s in tool.steps:
215             upload_docker(arvrunner, s.embedded_tool)
216
217
218 def packed_workflow(arvrunner, tool, merged_map):
219     """Create a packed workflow.
220
221     A "packed" workflow is one where all the components have been combined into a single document."""
222
223     rewrites = {}
224     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
225                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
226
227     rewrite_to_orig = {v: k for k,v in rewrites.items()}
228
229     def visit(v, cur_id):
230         if isinstance(v, dict):
231             if v.get("class") in ("CommandLineTool", "Workflow"):
232                 if "id" not in v:
233                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
234                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
235             if "location" in v and not v["location"].startswith("keep:"):
236                 v["location"] = merged_map[cur_id].resolved[v["location"]]
237             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
238                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
239             for l in v:
240                 visit(v[l], cur_id)
241         if isinstance(v, list):
242             for l in v:
243                 visit(l, cur_id)
244     visit(packed, None)
245     return packed
246
247
248 def tag_git_version(packed):
249     if tool.tool["id"].startswith("file://"):
250         path = os.path.dirname(tool.tool["id"][7:])
251         try:
252             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
253         except (OSError, subprocess.CalledProcessError):
254             pass
255         else:
256             packed["http://schema.org/version"] = githash
257
258
259 def upload_job_order(arvrunner, name, tool, job_order):
260     """Upload local files referenced in the input object and return updated input
261     object with 'location' updated to the proper keep references.
262     """
263
264     discover_secondary_files(tool.tool["inputs"], job_order)
265
266     jobmapper = upload_dependencies(arvrunner,
267                                     name,
268                                     tool.doc_loader,
269                                     job_order,
270                                     job_order.get("id", "#"),
271                                     False)
272
273     if "id" in job_order:
274         del job_order["id"]
275
276     # Need to filter this out, gets added by cwltool when providing
277     # parameters on the command line.
278     if "job_order" in job_order:
279         del job_order["job_order"]
280
281     return job_order
282
283 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
284
285 def upload_workflow_deps(arvrunner, tool):
286     # Ensure that Docker images needed by this workflow are available
287
288     upload_docker(arvrunner, tool)
289
290     document_loader = tool.doc_loader
291
292     merged_map = {}
293
294     def upload_tool_deps(deptool):
295         if "id" in deptool:
296             discovered_secondaryfiles = {}
297             pm = upload_dependencies(arvrunner,
298                                      "%s dependencies" % (shortname(deptool["id"])),
299                                      document_loader,
300                                      deptool,
301                                      deptool["id"],
302                                      False,
303                                      include_primary=False,
304                                      discovered_secondaryfiles=discovered_secondaryfiles)
305             document_loader.idx[deptool["id"]] = deptool
306             toolmap = {}
307             for k,v in pm.items():
308                 toolmap[k] = v.resolved
309             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
310
311     tool.visit(upload_tool_deps)
312
313     return merged_map
314
315 def arvados_jobs_image(arvrunner, img):
316     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
317
318     try:
319         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
320     except Exception as e:
321         raise Exception("Docker image %s is not available\n%s" % (img, e) )
322     return img
323
324 def upload_workflow_collection(arvrunner, name, packed):
325     collection = arvados.collection.Collection(api_client=arvrunner.api,
326                                                keep_client=arvrunner.keep_client,
327                                                num_retries=arvrunner.num_retries)
328     with collection.open("workflow.cwl", "w") as f:
329         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
330
331     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
332                ["name", "like", name+"%"]]
333     if arvrunner.project_uuid:
334         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
335     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
336
337     if exists["items"]:
338         logger.info("Using collection %s", exists["items"][0]["uuid"])
339     else:
340         collection.save_new(name=name,
341                             owner_uuid=arvrunner.project_uuid,
342                             ensure_unique_name=True,
343                             num_retries=arvrunner.num_retries)
344         logger.info("Uploaded to %s", collection.manifest_locator())
345
346     return collection.portable_data_hash()
347
348
349 class Runner(object):
350     """Base class for runner processes, which submit an instance of
351     arvados-cwl-runner and wait for the final result."""
352
353     def __init__(self, runner, tool, job_order, enable_reuse,
354                  output_name, output_tags, submit_runner_ram=0,
355                  name=None, on_error=None, submit_runner_image=None,
356                  intermediate_output_ttl=0, merged_map=None,
357                  priority=None, secret_store=None):
358         self.arvrunner = runner
359         self.tool = tool
360         self.job_order = job_order
361         self.running = False
362         if enable_reuse:
363             # If reuse is permitted by command line arguments but
364             # disabled by the workflow itself, disable it.
365             reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
366             if reuse_req:
367                 enable_reuse = reuse_req["enableReuse"]
368         self.enable_reuse = enable_reuse
369         self.uuid = None
370         self.final_output = None
371         self.output_name = output_name
372         self.output_tags = output_tags
373         self.name = name
374         self.on_error = on_error
375         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
376         self.intermediate_output_ttl = intermediate_output_ttl
377         self.priority = priority
378         self.secret_store = secret_store
379
380         self.submit_runner_cores = 1
381         self.submit_runner_ram = 1024  # defaut 1 GiB
382
383         runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
384         if runner_resource_req:
385             if runner_resource_req.get("coresMin"):
386                 self.submit_runner_cores = runner_resource_req["coresMin"]
387             if runner_resource_req.get("ramMin"):
388                 self.submit_runner_ram = runner_resource_req["ramMin"]
389
390         if submit_runner_ram:
391             # Command line / initializer overrides default and/or spec from workflow
392             self.submit_runner_ram = submit_runner_ram
393
394         if self.submit_runner_ram <= 0:
395             raise Exception("Value of submit-runner-ram must be greater than zero")
396
397         if self.submit_runner_cores <= 0:
398             raise Exception("Value of submit-runner-cores must be greater than zero")
399
400         self.merged_map = merged_map or {}
401
402     def update_pipeline_component(self, record):
403         pass
404
405     def done(self, record):
406         """Base method for handling a completed runner."""
407
408         try:
409             if record["state"] == "Complete":
410                 if record.get("exit_code") is not None:
411                     if record["exit_code"] == 33:
412                         processStatus = "UnsupportedRequirement"
413                     elif record["exit_code"] == 0:
414                         processStatus = "success"
415                     else:
416                         processStatus = "permanentFail"
417                 else:
418                     processStatus = "success"
419             else:
420                 processStatus = "permanentFail"
421
422             outputs = {}
423
424             if processStatus == "permanentFail":
425                 logc = arvados.collection.CollectionReader(record["log"],
426                                                            api_client=self.arvrunner.api,
427                                                            keep_client=self.arvrunner.keep_client,
428                                                            num_retries=self.arvrunner.num_retries)
429                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
430
431             self.final_output = record["output"]
432             outc = arvados.collection.CollectionReader(self.final_output,
433                                                        api_client=self.arvrunner.api,
434                                                        keep_client=self.arvrunner.keep_client,
435                                                        num_retries=self.arvrunner.num_retries)
436             if "cwl.output.json" in outc:
437                 with outc.open("cwl.output.json") as f:
438                     if f.size() > 0:
439                         outputs = json.load(f)
440             def keepify(fileobj):
441                 path = fileobj["location"]
442                 if not path.startswith("keep:"):
443                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
444             adjustFileObjs(outputs, keepify)
445             adjustDirObjs(outputs, keepify)
446         except Exception as e:
447             logger.exception("[%s] While getting final output object: %s", self.name, e)
448             self.arvrunner.output_callback({}, "permanentFail")
449         else:
450             self.arvrunner.output_callback(outputs, processStatus)