14360: Merge branch 'master' into 14360-dispatch-cloud
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess32 as subprocess
11 from collections import namedtuple
12
13 from StringIO import StringIO
14
15 from schema_salad.sourceline import SourceLine, cmap
16
17 from cwltool.command_line_tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 import arvados_cwl.arvdocker
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49
50 def remove_redundant_fields(obj):
51     for field in ("path", "nameext", "nameroot", "dirname"):
52         if field in obj:
53             del obj[field]
54
55
56 def find_defaults(d, op):
57     if isinstance(d, list):
58         for i in d:
59             find_defaults(i, op)
60     elif isinstance(d, dict):
61         if "default" in d:
62             op(d)
63         else:
64             for i in d.itervalues():
65                 find_defaults(i, op)
66
67 def setSecondary(t, fileobj, discovered):
68     if isinstance(fileobj, dict) and fileobj.get("class") == "File":
69         if "secondaryFiles" not in fileobj:
70             fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
71             if discovered is not None:
72                 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
73     elif isinstance(fileobj, list):
74         for e in fileobj:
75             setSecondary(t, e, discovered)
76
77 def discover_secondary_files(inputs, job_order, discovered=None):
78     for t in inputs:
79         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
80             setSecondary(t, job_order[shortname(t["id"])], discovered)
81
82
83 def upload_dependencies(arvrunner, name, document_loader,
84                         workflowobj, uri, loadref_run,
85                         include_primary=True, discovered_secondaryfiles=None):
86     """Upload the dependencies of the workflowobj document to Keep.
87
88     Returns a pathmapper object mapping local paths to keep references.  Also
89     does an in-place update of references in "workflowobj".
90
91     Use scandeps to find $import, $include, $schemas, run, File and Directory
92     fields that represent external references.
93
94     If workflowobj has an "id" field, this will reload the document to ensure
95     it is scanning the raw document prior to preprocessing.
96     """
97
98     loaded = set()
99     def loadref(b, u):
100         joined = document_loader.fetcher.urljoin(b, u)
101         defrg, _ = urlparse.urldefrag(joined)
102         if defrg not in loaded:
103             loaded.add(defrg)
104             # Use fetch_text to get raw file (before preprocessing).
105             text = document_loader.fetch_text(defrg)
106             if isinstance(text, bytes):
107                 textIO = StringIO(text.decode('utf-8'))
108             else:
109                 textIO = StringIO(text)
110             return yaml.safe_load(textIO)
111         else:
112             return {}
113
114     if loadref_run:
115         loadref_fields = set(("$import", "run"))
116     else:
117         loadref_fields = set(("$import",))
118
119     scanobj = workflowobj
120     if "id" in workflowobj:
121         # Need raw file content (before preprocessing) to ensure
122         # that external references in $include and $mixin are captured.
123         scanobj = loadref("", workflowobj["id"])
124
125     sc_result = scandeps(uri, scanobj,
126                   loadref_fields,
127                   set(("$include", "$schemas", "location")),
128                   loadref, urljoin=document_loader.fetcher.urljoin)
129
130     sc = []
131     def only_real(obj):
132         # Only interested in local files than need to be uploaded,
133         # don't include file literals, keep references, etc.
134         sp = obj.get("location", "").split(":")
135         if len(sp) > 1 and sp[0] in ("file", "http", "https"):
136             sc.append(obj)
137
138     visit_class(sc_result, ("File", "Directory"), only_real)
139
140     normalizeFilesDirs(sc)
141
142     if include_primary and "id" in workflowobj:
143         sc.append({"class": "File", "location": workflowobj["id"]})
144
145     if "$schemas" in workflowobj:
146         for s in workflowobj["$schemas"]:
147             sc.append({"class": "File", "location": s})
148
149     def visit_default(obj):
150         remove = [False]
151         def ensure_default_location(f):
152             if "location" not in f and "path" in f:
153                 f["location"] = f["path"]
154                 del f["path"]
155             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
156                 # Doesn't exist, remove from list of dependencies to upload
157                 sc[:] = [x for x in sc if x["location"] != f["location"]]
158                 # Delete "default" from workflowobj
159                 remove[0] = True
160         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
161         if remove[0]:
162             del obj["default"]
163
164     find_defaults(workflowobj, visit_default)
165
166     discovered = {}
167     def discover_default_secondary_files(obj):
168         discover_secondary_files(obj["inputs"],
169                                  {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
170                                  discovered)
171
172     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
173
174     for d in list(discovered.keys()):
175         # Only interested in discovered secondaryFiles which are local
176         # files that need to be uploaded.
177         if d.startswith("file:"):
178             sc.extend(discovered[d])
179         else:
180             del discovered[d]
181
182     mapper = ArvPathMapper(arvrunner, sc, "",
183                            "keep:%s",
184                            "keep:%s/%s",
185                            name=name,
186                            single_collection=True)
187
188     def setloc(p):
189         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
190             p["location"] = mapper.mapper(p["location"]).resolved
191
192     visit_class(workflowobj, ("File", "Directory"), setloc)
193     visit_class(discovered, ("File", "Directory"), setloc)
194
195     if discovered_secondaryfiles is not None:
196         for d in discovered:
197             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
198
199     if "$schemas" in workflowobj:
200         sch = []
201         for s in workflowobj["$schemas"]:
202             sch.append(mapper.mapper(s).resolved)
203         workflowobj["$schemas"] = sch
204
205     return mapper
206
207
208 def upload_docker(arvrunner, tool):
209     """Uploads Docker images used in CommandLineTool objects."""
210
211     if isinstance(tool, CommandLineTool):
212         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
213         if docker_req:
214             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
215                 # TODO: can be supported by containers API, but not jobs API.
216                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
217                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
218             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
219         else:
220             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
221     elif isinstance(tool, cwltool.workflow.Workflow):
222         for s in tool.steps:
223             upload_docker(arvrunner, s.embedded_tool)
224
225
226 def packed_workflow(arvrunner, tool, merged_map):
227     """Create a packed workflow.
228
229     A "packed" workflow is one where all the components have been combined into a single document."""
230
231     rewrites = {}
232     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
233                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
234
235     rewrite_to_orig = {v: k for k,v in rewrites.items()}
236
237     def visit(v, cur_id):
238         if isinstance(v, dict):
239             if v.get("class") in ("CommandLineTool", "Workflow"):
240                 if "id" not in v:
241                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
242                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
243             if "location" in v and not v["location"].startswith("keep:"):
244                 v["location"] = merged_map[cur_id].resolved[v["location"]]
245             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
246                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
247             if v.get("class") == "DockerRequirement":
248                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
249             for l in v:
250                 visit(v[l], cur_id)
251         if isinstance(v, list):
252             for l in v:
253                 visit(l, cur_id)
254     visit(packed, None)
255     return packed
256
257
258 def tag_git_version(packed):
259     if tool.tool["id"].startswith("file://"):
260         path = os.path.dirname(tool.tool["id"][7:])
261         try:
262             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
263         except (OSError, subprocess.CalledProcessError):
264             pass
265         else:
266             packed["http://schema.org/version"] = githash
267
268
269 def upload_job_order(arvrunner, name, tool, job_order):
270     """Upload local files referenced in the input object and return updated input
271     object with 'location' updated to the proper keep references.
272     """
273
274     discover_secondary_files(tool.tool["inputs"], job_order)
275
276     jobmapper = upload_dependencies(arvrunner,
277                                     name,
278                                     tool.doc_loader,
279                                     job_order,
280                                     job_order.get("id", "#"),
281                                     False)
282
283     if "id" in job_order:
284         del job_order["id"]
285
286     # Need to filter this out, gets added by cwltool when providing
287     # parameters on the command line.
288     if "job_order" in job_order:
289         del job_order["job_order"]
290
291     return job_order
292
293 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
294
295 def upload_workflow_deps(arvrunner, tool):
296     # Ensure that Docker images needed by this workflow are available
297
298     upload_docker(arvrunner, tool)
299
300     document_loader = tool.doc_loader
301
302     merged_map = {}
303
304     def upload_tool_deps(deptool):
305         if "id" in deptool:
306             discovered_secondaryfiles = {}
307             pm = upload_dependencies(arvrunner,
308                                      "%s dependencies" % (shortname(deptool["id"])),
309                                      document_loader,
310                                      deptool,
311                                      deptool["id"],
312                                      False,
313                                      include_primary=False,
314                                      discovered_secondaryfiles=discovered_secondaryfiles)
315             document_loader.idx[deptool["id"]] = deptool
316             toolmap = {}
317             for k,v in pm.items():
318                 toolmap[k] = v.resolved
319             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
320
321     tool.visit(upload_tool_deps)
322
323     return merged_map
324
325 def arvados_jobs_image(arvrunner, img):
326     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
327
328     try:
329         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
330     except Exception as e:
331         raise Exception("Docker image %s is not available\n%s" % (img, e) )
332
333
334 def upload_workflow_collection(arvrunner, name, packed):
335     collection = arvados.collection.Collection(api_client=arvrunner.api,
336                                                keep_client=arvrunner.keep_client,
337                                                num_retries=arvrunner.num_retries)
338     with collection.open("workflow.cwl", "w") as f:
339         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
340
341     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
342                ["name", "like", name+"%"]]
343     if arvrunner.project_uuid:
344         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
345     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
346
347     if exists["items"]:
348         logger.info("Using collection %s", exists["items"][0]["uuid"])
349     else:
350         collection.save_new(name=name,
351                             owner_uuid=arvrunner.project_uuid,
352                             ensure_unique_name=True,
353                             num_retries=arvrunner.num_retries)
354         logger.info("Uploaded to %s", collection.manifest_locator())
355
356     return collection.portable_data_hash()
357
358
359 class Runner(Process):
360     """Base class for runner processes, which submit an instance of
361     arvados-cwl-runner and wait for the final result."""
362
363     def __init__(self, runner, tool, loadingContext, enable_reuse,
364                  output_name, output_tags, submit_runner_ram=0,
365                  name=None, on_error=None, submit_runner_image=None,
366                  intermediate_output_ttl=0, merged_map=None,
367                  priority=None, secret_store=None,
368                  collection_cache_size=256,
369                  collection_cache_is_default=True):
370
371         super(Runner, self).__init__(tool.tool, loadingContext)
372
373         self.arvrunner = runner
374         self.embedded_tool = tool
375         self.job_order = None
376         self.running = False
377         if enable_reuse:
378             # If reuse is permitted by command line arguments but
379             # disabled by the workflow itself, disable it.
380             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
381             if reuse_req:
382                 enable_reuse = reuse_req["enableReuse"]
383         self.enable_reuse = enable_reuse
384         self.uuid = None
385         self.final_output = None
386         self.output_name = output_name
387         self.output_tags = output_tags
388         self.name = name
389         self.on_error = on_error
390         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
391         self.intermediate_output_ttl = intermediate_output_ttl
392         self.priority = priority
393         self.secret_store = secret_store
394
395         self.submit_runner_cores = 1
396         self.submit_runner_ram = 1024  # defaut 1 GiB
397         self.collection_cache_size = collection_cache_size
398
399         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
400         if runner_resource_req:
401             if runner_resource_req.get("coresMin"):
402                 self.submit_runner_cores = runner_resource_req["coresMin"]
403             if runner_resource_req.get("ramMin"):
404                 self.submit_runner_ram = runner_resource_req["ramMin"]
405             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
406                 self.collection_cache_size = runner_resource_req["keep_cache"]
407
408         if submit_runner_ram:
409             # Command line / initializer overrides default and/or spec from workflow
410             self.submit_runner_ram = submit_runner_ram
411
412         if self.submit_runner_ram <= 0:
413             raise Exception("Value of submit-runner-ram must be greater than zero")
414
415         if self.submit_runner_cores <= 0:
416             raise Exception("Value of submit-runner-cores must be greater than zero")
417
418         self.merged_map = merged_map or {}
419
420     def job(self,
421             job_order,         # type: Mapping[Text, Text]
422             output_callbacks,  # type: Callable[[Any, Any], Any]
423             runtimeContext     # type: RuntimeContext
424            ):  # type: (...) -> Generator[Any, None, None]
425         self.job_order = job_order
426         self._init_job(job_order, runtimeContext)
427         yield self
428
429     def update_pipeline_component(self, record):
430         pass
431
432     def done(self, record):
433         """Base method for handling a completed runner."""
434
435         try:
436             if record["state"] == "Complete":
437                 if record.get("exit_code") is not None:
438                     if record["exit_code"] == 33:
439                         processStatus = "UnsupportedRequirement"
440                     elif record["exit_code"] == 0:
441                         processStatus = "success"
442                     else:
443                         processStatus = "permanentFail"
444                 else:
445                     processStatus = "success"
446             else:
447                 processStatus = "permanentFail"
448
449             outputs = {}
450
451             if processStatus == "permanentFail":
452                 logc = arvados.collection.CollectionReader(record["log"],
453                                                            api_client=self.arvrunner.api,
454                                                            keep_client=self.arvrunner.keep_client,
455                                                            num_retries=self.arvrunner.num_retries)
456                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
457
458             self.final_output = record["output"]
459             outc = arvados.collection.CollectionReader(self.final_output,
460                                                        api_client=self.arvrunner.api,
461                                                        keep_client=self.arvrunner.keep_client,
462                                                        num_retries=self.arvrunner.num_retries)
463             if "cwl.output.json" in outc:
464                 with outc.open("cwl.output.json") as f:
465                     if f.size() > 0:
466                         outputs = json.load(f)
467             def keepify(fileobj):
468                 path = fileobj["location"]
469                 if not path.startswith("keep:"):
470                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
471             adjustFileObjs(outputs, keepify)
472             adjustDirObjs(outputs, keepify)
473         except Exception as e:
474             logger.exception("[%s] While getting final output object: %s", self.name, e)
475             self.arvrunner.output_callback({}, "permanentFail")
476         else:
477             self.arvrunner.output_callback(outputs, processStatus)