f907d33951c45a5d707bb15dd18c9154ae1b5bad
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess
11 from collections import namedtuple
12
13 from StringIO import StringIO
14
15 from schema_salad.sourceline import SourceLine, cmap
16
17 from cwltool.command_line_tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49
50 def remove_redundant_fields(obj):
51     for field in ("path", "nameext", "nameroot", "dirname"):
52         if field in obj:
53             del obj[field]
54
55
56 def find_defaults(d, op):
57     if isinstance(d, list):
58         for i in d:
59             find_defaults(i, op)
60     elif isinstance(d, dict):
61         if "default" in d:
62             op(d)
63         else:
64             for i in d.itervalues():
65                 find_defaults(i, op)
66
67 def setSecondary(t, fileobj, discovered):
68     if isinstance(fileobj, dict) and fileobj.get("class") == "File":
69         if "secondaryFiles" not in fileobj:
70             fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
71             if discovered is not None:
72                 discovered[fileobj["location"]] = fileobj["secondaryFiles"]
73     elif isinstance(fileobj, list):
74         for e in fileobj:
75             setSecondary(t, e, discovered)
76
77 def discover_secondary_files(inputs, job_order, discovered=None):
78     for t in inputs:
79         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
80             setSecondary(t, job_order[shortname(t["id"])], discovered)
81
82
83 def upload_dependencies(arvrunner, name, document_loader,
84                         workflowobj, uri, loadref_run,
85                         include_primary=True, discovered_secondaryfiles=None):
86     """Upload the dependencies of the workflowobj document to Keep.
87
88     Returns a pathmapper object mapping local paths to keep references.  Also
89     does an in-place update of references in "workflowobj".
90
91     Use scandeps to find $import, $include, $schemas, run, File and Directory
92     fields that represent external references.
93
94     If workflowobj has an "id" field, this will reload the document to ensure
95     it is scanning the raw document prior to preprocessing.
96     """
97
98     loaded = set()
99     def loadref(b, u):
100         joined = document_loader.fetcher.urljoin(b, u)
101         defrg, _ = urlparse.urldefrag(joined)
102         if defrg not in loaded:
103             loaded.add(defrg)
104             # Use fetch_text to get raw file (before preprocessing).
105             text = document_loader.fetch_text(defrg)
106             if isinstance(text, bytes):
107                 textIO = StringIO(text.decode('utf-8'))
108             else:
109                 textIO = StringIO(text)
110             return yaml.safe_load(textIO)
111         else:
112             return {}
113
114     if loadref_run:
115         loadref_fields = set(("$import", "run"))
116     else:
117         loadref_fields = set(("$import",))
118
119     scanobj = workflowobj
120     if "id" in workflowobj:
121         # Need raw file content (before preprocessing) to ensure
122         # that external references in $include and $mixin are captured.
123         scanobj = loadref("", workflowobj["id"])
124
125     sc_result = scandeps(uri, scanobj,
126                   loadref_fields,
127                   set(("$include", "$schemas", "location")),
128                   loadref, urljoin=document_loader.fetcher.urljoin)
129
130     sc = []
131     def only_real(obj):
132         if obj.get("location", "").startswith("file:"):
133             sc.append(obj)
134
135     visit_class(sc_result, ("File", "Directory"), only_real)
136
137     normalizeFilesDirs(sc)
138
139     if include_primary and "id" in workflowobj:
140         sc.append({"class": "File", "location": workflowobj["id"]})
141
142     if "$schemas" in workflowobj:
143         for s in workflowobj["$schemas"]:
144             sc.append({"class": "File", "location": s})
145
146     def visit_default(obj):
147         remove = [False]
148         def ensure_default_location(f):
149             if "location" not in f and "path" in f:
150                 f["location"] = f["path"]
151                 del f["path"]
152             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
153                 # Doesn't exist, remove from list of dependencies to upload
154                 sc[:] = [x for x in sc if x["location"] != f["location"]]
155                 # Delete "default" from workflowobj
156                 remove[0] = True
157         visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
158         if remove[0]:
159             del obj["default"]
160
161     find_defaults(workflowobj, visit_default)
162
163     discovered = {}
164     def discover_default_secondary_files(obj):
165         discover_secondary_files(obj["inputs"],
166                                  {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
167                                  discovered)
168
169     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
170
171     for d in discovered:
172         sc.extend(discovered[d])
173
174     mapper = ArvPathMapper(arvrunner, sc, "",
175                            "keep:%s",
176                            "keep:%s/%s",
177                            name=name,
178                            single_collection=True)
179
180     def setloc(p):
181         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
182             p["location"] = mapper.mapper(p["location"]).resolved
183
184     visit_class(workflowobj, ("File", "Directory"), setloc)
185     visit_class(discovered, ("File", "Directory"), setloc)
186
187     if discovered_secondaryfiles is not None:
188         for d in discovered:
189             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
190
191     if "$schemas" in workflowobj:
192         sch = []
193         for s in workflowobj["$schemas"]:
194             sch.append(mapper.mapper(s).resolved)
195         workflowobj["$schemas"] = sch
196
197     return mapper
198
199
200 def upload_docker(arvrunner, tool):
201     """Uploads Docker images used in CommandLineTool objects."""
202
203     if isinstance(tool, CommandLineTool):
204         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
205         if docker_req:
206             if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
207                 # TODO: can be supported by containers API, but not jobs API.
208                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
209                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
210             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
211         else:
212             arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
213     elif isinstance(tool, cwltool.workflow.Workflow):
214         for s in tool.steps:
215             upload_docker(arvrunner, s.embedded_tool)
216
217
218 def packed_workflow(arvrunner, tool, merged_map):
219     """Create a packed workflow.
220
221     A "packed" workflow is one where all the components have been combined into a single document."""
222
223     rewrites = {}
224     packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
225                   tool.tool["id"], tool.metadata, rewrite_out=rewrites)
226
227     rewrite_to_orig = {v: k for k,v in rewrites.items()}
228
229     def visit(v, cur_id):
230         if isinstance(v, dict):
231             if v.get("class") in ("CommandLineTool", "Workflow"):
232                 if "id" not in v:
233                     raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
234                 cur_id = rewrite_to_orig.get(v["id"], v["id"])
235             if "location" in v and not v["location"].startswith("keep:"):
236                 v["location"] = merged_map[cur_id].resolved[v["location"]]
237             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
238                 v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
239             for l in v:
240                 visit(v[l], cur_id)
241         if isinstance(v, list):
242             for l in v:
243                 visit(l, cur_id)
244     visit(packed, None)
245     return packed
246
247
248 def tag_git_version(packed):
249     if tool.tool["id"].startswith("file://"):
250         path = os.path.dirname(tool.tool["id"][7:])
251         try:
252             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
253         except (OSError, subprocess.CalledProcessError):
254             pass
255         else:
256             packed["http://schema.org/version"] = githash
257
258
259 def upload_job_order(arvrunner, name, tool, job_order):
260     """Upload local files referenced in the input object and return updated input
261     object with 'location' updated to the proper keep references.
262     """
263
264     discover_secondary_files(tool.tool["inputs"], job_order)
265
266     jobmapper = upload_dependencies(arvrunner,
267                                     name,
268                                     tool.doc_loader,
269                                     job_order,
270                                     job_order.get("id", "#"),
271                                     False)
272
273     if "id" in job_order:
274         del job_order["id"]
275
276     # Need to filter this out, gets added by cwltool when providing
277     # parameters on the command line.
278     if "job_order" in job_order:
279         del job_order["job_order"]
280
281     return job_order
282
283 FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
284
285 def upload_workflow_deps(arvrunner, tool):
286     # Ensure that Docker images needed by this workflow are available
287
288     upload_docker(arvrunner, tool)
289
290     document_loader = tool.doc_loader
291
292     merged_map = {}
293
294     def upload_tool_deps(deptool):
295         if "id" in deptool:
296             discovered_secondaryfiles = {}
297             pm = upload_dependencies(arvrunner,
298                                      "%s dependencies" % (shortname(deptool["id"])),
299                                      document_loader,
300                                      deptool,
301                                      deptool["id"],
302                                      False,
303                                      include_primary=False,
304                                      discovered_secondaryfiles=discovered_secondaryfiles)
305             document_loader.idx[deptool["id"]] = deptool
306             toolmap = {}
307             for k,v in pm.items():
308                 toolmap[k] = v.resolved
309             merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
310
311     tool.visit(upload_tool_deps)
312
313     return merged_map
314
315 def arvados_jobs_image(arvrunner, img):
316     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
317
318     try:
319         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
320     except Exception as e:
321         raise Exception("Docker image %s is not available\n%s" % (img, e) )
322     return img
323
324 def upload_workflow_collection(arvrunner, name, packed):
325     collection = arvados.collection.Collection(api_client=arvrunner.api,
326                                                keep_client=arvrunner.keep_client,
327                                                num_retries=arvrunner.num_retries)
328     with collection.open("workflow.cwl", "w") as f:
329         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
330
331     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
332                ["name", "like", name+"%"]]
333     if arvrunner.project_uuid:
334         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
335     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
336
337     if exists["items"]:
338         logger.info("Using collection %s", exists["items"][0]["uuid"])
339     else:
340         collection.save_new(name=name,
341                             owner_uuid=arvrunner.project_uuid,
342                             ensure_unique_name=True,
343                             num_retries=arvrunner.num_retries)
344         logger.info("Uploaded to %s", collection.manifest_locator())
345
346     return collection.portable_data_hash()
347
348
349 class Runner(object):
350     """Base class for runner processes, which submit an instance of
351     arvados-cwl-runner and wait for the final result."""
352
353     def __init__(self, runner, tool, job_order, enable_reuse,
354                  output_name, output_tags, submit_runner_ram=0,
355                  name=None, on_error=None, submit_runner_image=None,
356                  intermediate_output_ttl=0, merged_map=None, default_storage_classes="default",
357                  priority=None, secret_store=None):
358         self.arvrunner = runner
359         self.tool = tool
360         self.job_order = job_order
361         self.running = False
362         if enable_reuse:
363             # If reuse is permitted by command line arguments but
364             # disabled by the workflow itself, disable it.
365             reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
366             if reuse_req:
367                 enable_reuse = reuse_req["enableReuse"]
368         self.enable_reuse = enable_reuse
369         self.uuid = None
370         self.final_output = None
371         self.output_name = output_name
372         self.output_tags = output_tags
373         self.name = name
374         self.on_error = on_error
375         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
376         self.intermediate_output_ttl = intermediate_output_ttl
377         self.priority = priority
378         self.secret_store = secret_store
379         self.default_storage_classes = default_storage_classes
380
381         if submit_runner_ram:
382             self.submit_runner_ram = submit_runner_ram
383         else:
384             self.submit_runner_ram = 3000
385
386         if self.submit_runner_ram <= 0:
387             raise Exception("Value of --submit-runner-ram must be greater than zero")
388
389         self.merged_map = merged_map or {}
390
391     def update_pipeline_component(self, record):
392         pass
393
394     def done(self, record):
395         """Base method for handling a completed runner."""
396
397         try:
398             if record["state"] == "Complete":
399                 if record.get("exit_code") is not None:
400                     if record["exit_code"] == 33:
401                         processStatus = "UnsupportedRequirement"
402                     elif record["exit_code"] == 0:
403                         processStatus = "success"
404                     else:
405                         processStatus = "permanentFail"
406                 else:
407                     processStatus = "success"
408             else:
409                 processStatus = "permanentFail"
410
411             outputs = {}
412
413             if processStatus == "permanentFail":
414                 logc = arvados.collection.CollectionReader(record["log"],
415                                                            api_client=self.arvrunner.api,
416                                                            keep_client=self.arvrunner.keep_client,
417                                                            num_retries=self.arvrunner.num_retries)
418                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
419
420             self.final_output = record["output"]
421             outc = arvados.collection.CollectionReader(self.final_output,
422                                                        api_client=self.arvrunner.api,
423                                                        keep_client=self.arvrunner.keep_client,
424                                                        num_retries=self.arvrunner.num_retries)
425             if "cwl.output.json" in outc:
426                 with outc.open("cwl.output.json") as f:
427                     if f.size() > 0:
428                         outputs = json.load(f)
429             def keepify(fileobj):
430                 path = fileobj["location"]
431                 if not path.startswith("keep:"):
432                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
433             adjustFileObjs(outputs, keepify)
434             adjustDirObjs(outputs, keepify)
435         except Exception as e:
436             logger.exception("[%s] While getting final output object: %s", self.name, e)
437             self.arvrunner.output_callback({}, "permanentFail")
438         else:
439             self.arvrunner.output_callback(outputs, processStatus)