b65e2c58e180a00f6bb8bca6688498880c3d5e00
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from future import standard_library
6 standard_library.install_aliases()
7 from future.utils import  viewvalues, viewitems
8
9 import os
10 import urllib.parse
11 from functools import partial
12 import logging
13 import json
14 import subprocess32 as subprocess
15 from collections import namedtuple
16
17 from io import StringIO
18
19 from schema_salad.sourceline import SourceLine, cmap
20
21 from cwltool.command_line_tool import CommandLineTool
22 import cwltool.workflow
23 from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
24 from cwltool.load_tool import fetch_document
25 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
26 from cwltool.utils import aslist
27 from cwltool.builder import substitute
28 from cwltool.pack import pack
29
30 import arvados.collection
31 import ruamel.yaml as yaml
32
33 import arvados_cwl.arvdocker
34 from .pathmapper import ArvPathMapper, trim_listing
35 from ._version import __version__
36 from . import done
37
38 logger = logging.getLogger('arvados.cwl-runner')
39
40 def trim_anonymous_location(obj):
41     """Remove 'location' field from File and Directory literals.
42
43     To make internal handling easier, literals are assigned a random id for
44     'location'.  However, when writing the record back out, this can break
45     reproducibility.  Since it is valid for literals not have a 'location'
46     field, remove it.
47
48     """
49
50     if obj.get("location", "").startswith("_:"):
51         del obj["location"]
52
53
54 def remove_redundant_fields(obj):
55     for field in ("path", "nameext", "nameroot", "dirname"):
56         if field in obj:
57             del obj[field]
58
59
60 def find_defaults(d, op):
61     if isinstance(d, list):
62         for i in d:
63             find_defaults(i, op)
64     elif isinstance(d, dict):
65         if "default" in d:
66             op(d)
67         else:
68             for i in viewvalues(d):
69                 find_defaults(i, op)
70
71 def setSecondary(t, fileobj, discovered):
72     if isinstance(fileobj, dict) and fileobj.get("class") == "File":
73         if "secondaryFiles" not in fileobj:
74             fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
75             if discovered is not None:
76                 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
77     elif isinstance(fileobj, list):
78         for e in fileobj:
79             setSecondary(t, e, discovered)
80
81 def discover_secondary_files(inputs, job_order, discovered=None):
82     for t in inputs:
83         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
84             setSecondary(t, job_order[shortname(t["id"])], discovered)
85
86
87 def upload_dependencies(arvrunner, name, document_loader,
88                         workflowobj, uri, loadref_run,
89                         include_primary=True, discovered_secondaryfiles=None):
90     """Upload the dependencies of the workflowobj document to Keep.
91
92     Returns a pathmapper object mapping local paths to keep references.  Also
93     does an in-place update of references in "workflowobj".
94
95     Use scandeps to find $import, $include, $schemas, run, File and Directory
96     fields that represent external references.
97
98     If workflowobj has an "id" field, this will reload the document to ensure
99     it is scanning the raw document prior to preprocessing.
100     """
101
102     loaded = set()
103     def loadref(b, u):
104         joined = document_loader.fetcher.urljoin(b, u)
105         defrg, _ = urllib.parse.urldefrag(joined)
106         if defrg not in loaded:
107             loaded.add(defrg)
108             # Use fetch_text to get raw file (before preprocessing).
109             text = document_loader.fetch_text(defrg)
110             if isinstance(text, bytes):
111                 textIO = StringIO(text.decode('utf-8'))
112             else:
113                 textIO = StringIO(text)
114             return yaml.safe_load(textIO)
115         else:
116             return {}
117
118     if loadref_run:
119         loadref_fields = set(("$import", "run"))
120     else:
121         loadref_fields = set(("$import",))
122
123     scanobj = workflowobj
124     if "id" in workflowobj:
125         # Need raw file content (before preprocessing) to ensure
126         # that external references in $include and $mixin are captured.
127         scanobj = loadref("", workflowobj["id"])
128
129     sc_result = scandeps(uri, scanobj,
130                   loadref_fields,
131                   set(("$include", "$schemas", "location")),
132                   loadref, urljoin=document_loader.fetcher.urljoin)
133
134     sc = []
135     def only_real(obj):
136         # Only interested in local files than need to be uploaded,
137         # don't include file literals, keep references, etc.
138         sp = obj.get("location", "").split(":")
139         if len(sp) > 1 and sp[0] in ("file", "http", "https"):
140             sc.append(obj)
141
142     visit_class(sc_result, ("File", "Directory"), only_real)
143
144     normalizeFilesDirs(sc)
145
146     if include_primary and "id" in workflowobj:
147         sc.append({"class": "File", "location": workflowobj["id"]})
148
149     if "$schemas" in workflowobj:
150         for s in workflowobj["$schemas"]:
151             sc.append({"class": "File", "location": s})
152
153     def visit_default(obj):
154         remove = [False]
155         def ensure_default_location(f):
156             if "location" not in f and "path" in f:
157                 f["location"] = f["path"]
158                 del f["path"]
159             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
160                 # Doesn't exist, remove from list of dependencies to upload
161                 sc[:] = [x for x in sc if x["location"] != f["location"]]
162                 # Delete "default" from workflowobj
163                 remove[0] = True
164         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
165         if remove[0]:
166             del obj["default"]
167
168     find_defaults(workflowobj, visit_default)
169
170     discovered = {}
171     def discover_default_secondary_files(obj):
172         discover_secondary_files(obj["inputs"],
173                                  {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
174                                  discovered)
175
176     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
177
178     for d in list(discovered):
179         # Only interested in discovered secondaryFiles which are local
180         # files that need to be uploaded.
181         if d.startswith("file:"):
182             sc.extend(discovered[d])
183         else:
184             del discovered[d]
185
186     mapper = ArvPathMapper(arvrunner, sc, "",
187                            "keep:%s",
188                            "keep:%s/%s",
189                            name=name,
190                            single_collection=True)
191
192     def setloc(p):
193         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
194             p["location"] = mapper.mapper(p["location"]).resolved
195
196     visit_class(workflowobj, ("File", "Directory"), setloc)
197     visit_class(discovered, ("File", "Directory"), setloc)
198
199     if discovered_secondaryfiles is not None:
200         for d in discovered:
201             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
202
203     if "$schemas" in workflowobj:
204         sch = []
205         for s in workflowobj["$schemas"]:
206             sch.append(mapper.mapper(s).resolved)
207         workflowobj["$schemas"] = sch
208
209     return mapper
210
211
212 def upload_docker(arvrunner, tool):
213     """Uploads Docker images used in CommandLineTool objects."""
214
215     if isinstance(tool, CommandLineTool):
216         (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
217         if docker_req:
218             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
219                 # TODO: can be supported by containers API, but not jobs API.
220                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
221                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
222             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
223         else:
224             arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
225     elif isinstance(tool, cwltool.workflow.Workflow):
226         for s in tool.steps:
227             upload_docker(arvrunner, s.embedded_tool)
228
229
230 def packed_workflow(arvrunner, tool, merged_map):
231     """Create a packed workflow.
232
233     A "packed" workflow is one where all the components have been combined into a single document."""
234
235     rewrites = {}
236     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
237                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
238
239     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
240
241     def visit(v, cur_id):
242         if isinstance(v, dict):
243             if v.get("class") in ("CommandLineTool", "Workflow"):
244                 if "id" not in v:
245                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
246                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
247             if "location" in v and not v["location"].startswith("keep:"):
248                 v["location"] = merged_map[cur_id].resolved[v["location"]]
249             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
250                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
251             if v.get("class") == "DockerRequirement":
252                 v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
253             for l in v:
254                 visit(v[l], cur_id)
255         if isinstance(v, list):
256             for l in v:
257                 visit(l, cur_id)
258     visit(packed, None)
259     return packed
260
261
262 def tag_git_version(packed):
263     if tool.tool["id"].startswith("file://"):
264         path = os.path.dirname(tool.tool["id"][7:])
265         try:
266             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
267         except (OSError, subprocess.CalledProcessError):
268             pass
269         else:
270             packed["http://schema.org/version"] = githash
271
272
273 def upload_job_order(arvrunner, name, tool, job_order):
274     """Upload local files referenced in the input object and return updated input
275     object with 'location' updated to the proper keep references.
276     """
277
278     discover_secondary_files(tool.tool["inputs"], job_order)
279
280     jobmapper = upload_dependencies(arvrunner,
281                                     name,
282                                     tool.doc_loader,
283                                     job_order,
284                                     job_order.get("id", "#"),
285                                     False)
286
287     if "id" in job_order:
288         del job_order["id"]
289
290     # Need to filter this out, gets added by cwltool when providing
291     # parameters on the command line.
292     if "job_order" in job_order:
293         del job_order["job_order"]
294
295     return job_order
296
297 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
298
299 def upload_workflow_deps(arvrunner, tool):
300     # Ensure that Docker images needed by this workflow are available
301
302     upload_docker(arvrunner, tool)
303
304     document_loader = tool.doc_loader
305
306     merged_map = {}
307
308     def upload_tool_deps(deptool):
309         if "id" in deptool:
310             discovered_secondaryfiles = {}
311             pm = upload_dependencies(arvrunner,
312                                      "%s dependencies" % (shortname(deptool["id"])),
313                                      document_loader,
314                                      deptool,
315                                      deptool["id"],
316                                      False,
317                                      include_primary=False,
318                                      discovered_secondaryfiles=discovered_secondaryfiles)
319             document_loader.idx[deptool["id"]] = deptool
320             toolmap = {}
321             for k,v in pm.items():
322                 toolmap[k] = v.resolved
323             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
324
325     tool.visit(upload_tool_deps)
326
327     return merged_map
328
329 def arvados_jobs_image(arvrunner, img):
330     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
331
332     try:
333         return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
334     except Exception as e:
335         raise Exception("Docker image %s is not available\n%s" % (img, e) )
336
337
338 def upload_workflow_collection(arvrunner, name, packed):
339     collection = arvados.collection.Collection(api_client=arvrunner.api,
340                                                keep_client=arvrunner.keep_client,
341                                                num_retries=arvrunner.num_retries)
342     with collection.open("workflow.cwl", "w") as f:
343         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
344
345     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
346                ["name", "like", name+"%"]]
347     if arvrunner.project_uuid:
348         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
349     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
350
351     if exists["items"]:
352         logger.info("Using collection %s", exists["items"][0]["uuid"])
353     else:
354         collection.save_new(name=name,
355                             owner_uuid=arvrunner.project_uuid,
356                             ensure_unique_name=True,
357                             num_retries=arvrunner.num_retries)
358         logger.info("Uploaded to %s", collection.manifest_locator())
359
360     return collection.portable_data_hash()
361
362
363 class Runner(Process):
364     """Base class for runner processes, which submit an instance of
365     arvados-cwl-runner and wait for the final result."""
366
367     def __init__(self, runner, tool, loadingContext, enable_reuse,
368                  output_name, output_tags, submit_runner_ram=0,
369                  name=None, on_error=None, submit_runner_image=None,
370                  intermediate_output_ttl=0, merged_map=None,
371                  priority=None, secret_store=None,
372                  collection_cache_size=256,
373                  collection_cache_is_default=True):
374
375         super(Runner, self).__init__(tool.tool, loadingContext)
376
377         self.arvrunner = runner
378         self.embedded_tool = tool
379         self.job_order = None
380         self.running = False
381         if enable_reuse:
382             # If reuse is permitted by command line arguments but
383             # disabled by the workflow itself, disable it.
384             reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
385             if reuse_req:
386                 enable_reuse = reuse_req["enableReuse"]
387         self.enable_reuse = enable_reuse
388         self.uuid = None
389         self.final_output = None
390         self.output_name = output_name
391         self.output_tags = output_tags
392         self.name = name
393         self.on_error = on_error
394         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
395         self.intermediate_output_ttl = intermediate_output_ttl
396         self.priority = priority
397         self.secret_store = secret_store
398
399         self.submit_runner_cores = 1
400         self.submit_runner_ram = 1024  # defaut 1 GiB
401         self.collection_cache_size = collection_cache_size
402
403         runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
404         if runner_resource_req:
405             if runner_resource_req.get("coresMin"):
406                 self.submit_runner_cores = runner_resource_req["coresMin"]
407             if runner_resource_req.get("ramMin"):
408                 self.submit_runner_ram = runner_resource_req["ramMin"]
409             if runner_resource_req.get("keep_cache") and collection_cache_is_default:
410                 self.collection_cache_size = runner_resource_req["keep_cache"]
411
412         if submit_runner_ram:
413             # Command line / initializer overrides default and/or spec from workflow
414             self.submit_runner_ram = submit_runner_ram
415
416         if self.submit_runner_ram <= 0:
417             raise Exception("Value of submit-runner-ram must be greater than zero")
418
419         if self.submit_runner_cores <= 0:
420             raise Exception("Value of submit-runner-cores must be greater than zero")
421
422         self.merged_map = merged_map or {}
423
424     def job(self,
425             job_order,         # type: Mapping[Text, Text]
426             output_callbacks,  # type: Callable[[Any, Any], Any]
427             runtimeContext     # type: RuntimeContext
428            ):  # type: (...) -> Generator[Any, None, None]
429         self.job_order = job_order
430         self._init_job(job_order, runtimeContext)
431         yield self
432
433     def update_pipeline_component(self, record):
434         pass
435
436     def done(self, record):
437         """Base method for handling a completed runner."""
438
439         try:
440             if record["state"] == "Complete":
441                 if record.get("exit_code") is not None:
442                     if record["exit_code"] == 33:
443                         processStatus = "UnsupportedRequirement"
444                     elif record["exit_code"] == 0:
445                         processStatus = "success"
446                     else:
447                         processStatus = "permanentFail"
448                 else:
449                     processStatus = "success"
450             else:
451                 processStatus = "permanentFail"
452
453             outputs = {}
454
455             if processStatus == "permanentFail":
456                 logc = arvados.collection.CollectionReader(record["log"],
457                                                            api_client=self.arvrunner.api,
458                                                            keep_client=self.arvrunner.keep_client,
459                                                            num_retries=self.arvrunner.num_retries)
460                 done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
461
462             self.final_output = record["output"]
463             outc = arvados.collection.CollectionReader(self.final_output,
464                                                        api_client=self.arvrunner.api,
465                                                        keep_client=self.arvrunner.keep_client,
466                                                        num_retries=self.arvrunner.num_retries)
467             if "cwl.output.json" in outc:
468                 with outc.open("cwl.output.json", "rb") as f:
469                     if f.size() > 0:
470                         outputs = json.load(f)
471             def keepify(fileobj):
472                 path = fileobj["location"]
473                 if not path.startswith("keep:"):
474                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
475             adjustFileObjs(outputs, keepify)
476             adjustDirObjs(outputs, keepify)
477         except Exception as e:
478             logger.exception("[%s] While getting final output object: %s", self.name, e)
479             self.arvrunner.output_callback({}, "permanentFail")
480         else:
481             self.arvrunner.output_callback(outputs, processStatus)