Merge branch 'wtsi/python-api-timeout' refs #13542
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess32 as subprocess
11 from collections import namedtuple
12
13 from StringIO import StringIO
14
15 from schema_salad.sourceline import SourceLine, cmap
16
17 from cwltool.command_line_tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49
50 def remove_redundant_fields(obj):
51     for field in ("path", "nameext", "nameroot", "dirname"):
52         if field in obj:
53             del obj[field]
54
55
56 def find_defaults(d, op):
57     if isinstance(d, list):
58         for i in d:
59             find_defaults(i, op)
60     elif isinstance(d, dict):
61         if "default" in d:
62             op(d)
63         else:
64             for i in d.itervalues():
65                 find_defaults(i, op)
66
67 def setSecondary(t, fileobj, discovered):
68     if isinstance(fileobj, dict) and fileobj.get("class") == "File":
69         if "secondaryFiles" not in fileobj:
70             fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
71             if discovered is not None:
72                 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
73     elif isinstance(fileobj, list):
74         for e in fileobj:
75             setSecondary(t, e, discovered)
76
77 def discover_secondary_files(inputs, job_order, discovered=None):
78     for t in inputs:
79         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
80             setSecondary(t, job_order[shortname(t["id"])], discovered)
81
82
83 def upload_dependencies(arvrunner, name, document_loader,
84                         workflowobj, uri, loadref_run,
85                         include_primary=True, discovered_secondaryfiles=None):
86     """Upload the dependencies of the workflowobj document to Keep.
87
88     Returns a pathmapper object mapping local paths to keep references.  Also
89     does an in-place update of references in "workflowobj".
90
91     Use scandeps to find $import, $include, $schemas, run, File and Directory
92     fields that represent external references.
93
94     If workflowobj has an "id" field, this will reload the document to ensure
95     it is scanning the raw document prior to preprocessing.
96     """
97
98     loaded = set()
99     def loadref(b, u):
100         joined = document_loader.fetcher.urljoin(b, u)
101         defrg, _ = urlparse.urldefrag(joined)
102         if defrg not in loaded:
103             loaded.add(defrg)
104             # Use fetch_text to get raw file (before preprocessing).
105             text = document_loader.fetch_text(defrg)
106             if isinstance(text, bytes):
107                 textIO = StringIO(text.decode('utf-8'))
108             else:
109                 textIO = StringIO(text)
110             return yaml.safe_load(textIO)
111         else:
112             return {}
113
114     if loadref_run:
115         loadref_fields = set(("$import", "run"))
116     else:
117         loadref_fields = set(("$import",))
118
119     scanobj = workflowobj
120     if "id" in workflowobj:
121         # Need raw file content (before preprocessing) to ensure
122         # that external references in $include and $mixin are captured.
123         scanobj = loadref("", workflowobj["id"])
124
125     sc_result = scandeps(uri, scanobj,
126                   loadref_fields,
127                   set(("$include", "$schemas", "location")),
128                   loadref, urljoin=document_loader.fetcher.urljoin)
129
130     sc = []
131     def only_real(obj):
132         # Only interested in local files than need to be uploaded,
133         # don't include file literals, keep references, etc.
134         if obj.get("location", "").startswith("file:"):
135             sc.append(obj)
136
137     visit_class(sc_result, ("File", "Directory"), only_real)
138
139     normalizeFilesDirs(sc)
140
141     if include_primary and "id" in workflowobj:
142         sc.append({"class": "File", "location": workflowobj["id"]})
143
144     if "$schemas" in workflowobj:
145         for s in workflowobj["$schemas"]:
146             sc.append({"class": "File", "location": s})
147
148     def visit_default(obj):
149         remove = [False]
150         def ensure_default_location(f):
151             if "location" not in f and "path" in f:
152                 f["location"] = f["path"]
153                 del f["path"]
154             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
155                 # Doesn't exist, remove from list of dependencies to upload
156                 sc[:] = [x for x in sc if x["location"] != f["location"]]
157                 # Delete "default" from workflowobj
158                 remove[0] = True
159         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
160         if remove[0]:
161             del obj["default"]
162
163     find_defaults(workflowobj, visit_default)
164
165     discovered = {}
166     def discover_default_secondary_files(obj):
167         discover_secondary_files(obj["inputs"],
168                                  {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
169                                  discovered)
170
171     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
172
173     for d in list(discovered.keys()):
174         # Only interested in discovered secondaryFiles which are local
175         # files that need to be uploaded.
176         if d.startswith("file:"):
177             sc.extend(discovered[d])
178         else:
179             del discovered[d]
180
181     mapper = ArvPathMapper(arvrunner, sc, "",
182                            "keep:%s",
183                            "keep:%s/%s",
184                            name=name,
185                            single_collection=True)
186
187     def setloc(p):
188         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
189             p["location"] = mapper.mapper(p["location"]).resolved
190
191     visit_class(workflowobj, ("File", "Directory"), setloc)
192     visit_class(discovered, ("File", "Directory"), setloc)
193
194     if discovered_secondaryfiles is not None:
195         for d in discovered:
196             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
197
198     if "$schemas" in workflowobj:
199         sch = []
200         for s in workflowobj["$schemas"]:
201             sch.append(mapper.mapper(s).resolved)
202         workflowobj["$schemas"] = sch
203
204     return mapper
205
206
207 def upload_docker(arvrunner, tool):
208     """Uploads Docker images used in CommandLineTool objects."""
209
210     if isinstance(tool, CommandLineTool):
211         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
212         if docker_req:
213             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
214                 # TODO: can be supported by containers API, but not jobs API.
215                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
216                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
217             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
218         else:
219             arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
220     elif isinstance(tool, cwltool.workflow.Workflow):
221         for s in tool.steps:
222             upload_docker(arvrunner, s.embedded_tool)
223
224
225 def packed_workflow(arvrunner, tool, merged_map):
226     """Create a packed workflow.
227
228     A "packed" workflow is one where all the components have been combined into a single document."""
229
230     rewrites = {}
231     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
232                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
233
234     rewrite_to_orig = {v: k for k,v in rewrites.items()}
235
236     def visit(v, cur_id):
237         if isinstance(v, dict):
238             if v.get("class") in ("CommandLineTool", "Workflow"):
239                 if "id" not in v:
240                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
241                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
242             if "location" in v and not v["location"].startswith("keep:"):
243                 v["location"] = merged_map[cur_id].resolved[v["location"]]
244             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
245                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
246             for l in v:
247                 visit(v[l], cur_id)
248         if isinstance(v, list):
249             for l in v:
250                 visit(l, cur_id)
251     visit(packed, None)
252     return packed
253
254
255 def tag_git_version(packed):
256     if tool.tool["id"].startswith("file://"):
257         path = os.path.dirname(tool.tool["id"][7:])
258         try:
259             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
260         except (OSError, subprocess.CalledProcessError):
261             pass
262         else:
263             packed["http://schema.org/version"] = githash
264
265
266 def upload_job_order(arvrunner, name, tool, job_order):
267     """Upload local files referenced in the input object and return updated input
268     object with 'location' updated to the proper keep references.
269     """
270
271     discover_secondary_files(tool.tool["inputs"], job_order)
272
273     jobmapper = upload_dependencies(arvrunner,
274                                     name,
275                                     tool.doc_loader,
276                                     job_order,
277                                     job_order.get("id", "#"),
278                                     False)
279
280     if "id" in job_order:
281         del job_order["id"]
282
283     # Need to filter this out, gets added by cwltool when providing
284     # parameters on the command line.
285     if "job_order" in job_order:
286         del job_order["job_order"]
287
288     return job_order
289
290 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
291
292 def upload_workflow_deps(arvrunner, tool):
293     # Ensure that Docker images needed by this workflow are available
294
295     upload_docker(arvrunner, tool)
296
297     document_loader = tool.doc_loader
298
299     merged_map = {}
300
301     def upload_tool_deps(deptool):
302         if "id" in deptool:
303             discovered_secondaryfiles = {}
304             pm = upload_dependencies(arvrunner,
305                                      "%s dependencies" % (shortname(deptool["id"])),
306                                      document_loader,
307                                      deptool,
308                                      deptool["id"],
309                                      False,
310                                      include_primary=False,
311                                      discovered_secondaryfiles=discovered_secondaryfiles)
312             document_loader.idx[deptool["id"]] = deptool
313             toolmap = {}
314             for k,v in pm.items():
315                 toolmap[k] = v.resolved
316             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
317
318     tool.visit(upload_tool_deps)
319
320     return merged_map
321
322 def arvados_jobs_image(arvrunner, img):
323     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
324
325     try:
326         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
327     except Exception as e:
328         raise Exception("Docker image %s is not available\n%s" % (img, e) )
329     return img
330
331 def upload_workflow_collection(arvrunner, name, packed):
332     collection = arvados.collection.Collection(api_client=arvrunner.api,
333                                                keep_client=arvrunner.keep_client,
334                                                num_retries=arvrunner.num_retries)
335     with collection.open("workflow.cwl", "w") as f:
336         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
337
338     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
339                ["name", "like", name+"%"]]
340     if arvrunner.project_uuid:
341         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
342     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
343
344     if exists["items"]:
345         logger.info("Using collection %s", exists["items"][0]["uuid"])
346     else:
347         collection.save_new(name=name,
348                             owner_uuid=arvrunner.project_uuid,
349                             ensure_unique_name=True,
350                             num_retries=arvrunner.num_retries)
351         logger.info("Uploaded to %s", collection.manifest_locator())
352
353     return collection.portable_data_hash()
354
355
356 class Runner(object):
357     """Base class for runner processes, which submit an instance of
358     arvados-cwl-runner and wait for the final result."""
359
360     def __init__(self, runner, tool, job_order, enable_reuse,
361                  output_name, output_tags, submit_runner_ram=0,
362                  name=None, on_error=None, submit_runner_image=None,
363                  intermediate_output_ttl=0, merged_map=None,
364                  priority=None, secret_store=None):
365         self.arvrunner = runner
366         self.tool = tool
367         self.job_order = job_order
368         self.running = False
369         if enable_reuse:
370             # If reuse is permitted by command line arguments but
371             # disabled by the workflow itself, disable it.
372             reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
373             if reuse_req:
374                 enable_reuse = reuse_req["enableReuse"]
375         self.enable_reuse = enable_reuse
376         self.uuid = None
377         self.final_output = None
378         self.output_name = output_name
379         self.output_tags = output_tags
380         self.name = name
381         self.on_error = on_error
382         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
383         self.intermediate_output_ttl = intermediate_output_ttl
384         self.priority = priority
385         self.secret_store = secret_store
386
387         self.submit_runner_cores = 1
388         self.submit_runner_ram = 1024  # defaut 1 GiB
389
390         runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
391         if runner_resource_req:
392             if runner_resource_req.get("coresMin"):
393                 self.submit_runner_cores = runner_resource_req["coresMin"]
394             if runner_resource_req.get("ramMin"):
395                 self.submit_runner_ram = runner_resource_req["ramMin"]
396
397         if submit_runner_ram:
398             # Command line / initializer overrides default and/or spec from workflow
399             self.submit_runner_ram = submit_runner_ram
400
401         if self.submit_runner_ram <= 0:
402             raise Exception("Value of submit-runner-ram must be greater than zero")
403
404         if self.submit_runner_cores <= 0:
405             raise Exception("Value of submit-runner-cores must be greater than zero")
406
407         self.merged_map = merged_map or {}
408
409     def update_pipeline_component(self, record):
410         pass
411
412     def done(self, record):
413         """Base method for handling a completed runner."""
414
415         try:
416             if record["state"] == "Complete":
417                 if record.get("exit_code") is not None:
418                     if record["exit_code"] == 33:
419                         processStatus = "UnsupportedRequirement"
420                     elif record["exit_code"] == 0:
421                         processStatus = "success"
422                     else:
423                         processStatus = "permanentFail"
424                 else:
425                     processStatus = "success"
426             else:
427                 processStatus = "permanentFail"
428
429             outputs = {}
430
431             if processStatus == "permanentFail":
432                 logc = arvados.collection.CollectionReader(record["log"],
433                                                            api_client=self.arvrunner.api,
434                                                            keep_client=self.arvrunner.keep_client,
435                                                            num_retries=self.arvrunner.num_retries)
436                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
437
438             self.final_output = record["output"]
439             outc = arvados.collection.CollectionReader(self.final_output,
440                                                        api_client=self.arvrunner.api,
441                                                        keep_client=self.arvrunner.keep_client,
442                                                        num_retries=self.arvrunner.num_retries)
443             if "cwl.output.json" in outc:
444                 with outc.open("cwl.output.json") as f:
445                     if f.size() > 0:
446                         outputs = json.load(f)
447             def keepify(fileobj):
448                 path = fileobj["location"]
449                 if not path.startswith("keep:"):
450                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
451             adjustFileObjs(outputs, keepify)
452             adjustDirObjs(outputs, keepify)
453         except Exception as e:
454             logger.exception("[%s] While getting final output object: %s", self.name, e)
455             self.arvrunner.output_callback({}, "permanentFail")
456         else:
457             self.arvrunner.output_callback(outputs, processStatus)