Merge branch 'master' of git.curoverse.com:arvados into 11876-r-sdk
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess
11
12 from StringIO import StringIO
13
14 from schema_salad.sourceline import SourceLine
15
16 import cwltool.draft2tool
17 from cwltool.draft2tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49 def remove_redundant_fields(obj):
50     for field in ("path", "nameext", "nameroot", "dirname"):
51         if field in obj:
52             del obj[field]
53
54 def find_defaults(d, op):
55     if isinstance(d, list):
56         for i in d:
57             find_defaults(i, op)
58     elif isinstance(d, dict):
59         if "default" in d:
60             op(d)
61         else:
62             for i in d.itervalues():
63                 find_defaults(i, op)
64
65 def upload_dependencies(arvrunner, name, document_loader,
66                         workflowobj, uri, loadref_run, include_primary=True):
67     """Upload the dependencies of the workflowobj document to Keep.
68
69     Returns a pathmapper object mapping local paths to keep references.  Also
70     does an in-place update of references in "workflowobj".
71
72     Use scandeps to find $import, $include, $schemas, run, File and Directory
73     fields that represent external references.
74
75     If workflowobj has an "id" field, this will reload the document to ensure
76     it is scanning the raw document prior to preprocessing.
77     """
78
79     loaded = set()
80     def loadref(b, u):
81         joined = document_loader.fetcher.urljoin(b, u)
82         defrg, _ = urlparse.urldefrag(joined)
83         if defrg not in loaded:
84             loaded.add(defrg)
85             # Use fetch_text to get raw file (before preprocessing).
86             text = document_loader.fetch_text(defrg)
87             if isinstance(text, bytes):
88                 textIO = StringIO(text.decode('utf-8'))
89             else:
90                 textIO = StringIO(text)
91             return yaml.safe_load(textIO)
92         else:
93             return {}
94
95     if loadref_run:
96         loadref_fields = set(("$import", "run"))
97     else:
98         loadref_fields = set(("$import",))
99
100     scanobj = workflowobj
101     if "id" in workflowobj:
102         # Need raw file content (before preprocessing) to ensure
103         # that external references in $include and $mixin are captured.
104         scanobj = loadref("", workflowobj["id"])
105
106     sc = scandeps(uri, scanobj,
107                   loadref_fields,
108                   set(("$include", "$schemas", "location")),
109                   loadref, urljoin=document_loader.fetcher.urljoin)
110
111     normalizeFilesDirs(sc)
112
113     if include_primary and "id" in workflowobj:
114         sc.append({"class": "File", "location": workflowobj["id"]})
115
116     if "$schemas" in workflowobj:
117         for s in workflowobj["$schemas"]:
118             sc.append({"class": "File", "location": s})
119
120     def capture_default(obj):
121         remove = [False]
122         def add_default(f):
123             if "location" not in f and "path" in f:
124                 f["location"] = f["path"]
125                 del f["path"]
126             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
127                 # Remove from sc
128                 sc[:] = [x for x in sc if x["location"] != f["location"]]
129                 # Delete "default" from workflowobj
130                 remove[0] = True
131         visit_class(obj["default"], ("File", "Directory"), add_default)
132         if remove[0]:
133             del obj["default"]
134
135     find_defaults(workflowobj, capture_default)
136
137     mapper = ArvPathMapper(arvrunner, sc, "",
138                            "keep:%s",
139                            "keep:%s/%s",
140                            name=name,
141                            single_collection=True)
142
143     def setloc(p):
144         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
145             p["location"] = mapper.mapper(p["location"]).resolved
146     adjustFileObjs(workflowobj, setloc)
147     adjustDirObjs(workflowobj, setloc)
148
149     if "$schemas" in workflowobj:
150         sch = []
151         for s in workflowobj["$schemas"]:
152             sch.append(mapper.mapper(s).resolved)
153         workflowobj["$schemas"] = sch
154
155     return mapper
156
157
158 def upload_docker(arvrunner, tool):
159     """Uploads Docker images used in CommandLineTool objects."""
160
161     if isinstance(tool, CommandLineTool):
162         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
163         if docker_req:
164             if docker_req.get("dockerOutputDirectory"):
165                 # TODO: can be supported by containers API, but not jobs API.
166                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
167                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
168             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
169         else:
170             arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
171     elif isinstance(tool, cwltool.workflow.Workflow):
172         for s in tool.steps:
173             upload_docker(arvrunner, s.embedded_tool)
174
175 def packed_workflow(arvrunner, tool):
176     """Create a packed workflow.
177
178     A "packed" workflow is one where all the components have been combined into a single document."""
179
180     return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
181                 tool.tool["id"], tool.metadata)
182
183 def tag_git_version(packed):
184     if tool.tool["id"].startswith("file://"):
185         path = os.path.dirname(tool.tool["id"][7:])
186         try:
187             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
188         except (OSError, subprocess.CalledProcessError):
189             pass
190         else:
191             packed["http://schema.org/version"] = githash
192
193
194 def discover_secondary_files(inputs, job_order):
195     for t in inputs:
196         def setSecondary(fileobj):
197             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
198                 if "secondaryFiles" not in fileobj:
199                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
200
201             if isinstance(fileobj, list):
202                 for e in fileobj:
203                     setSecondary(e)
204
205         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
206             setSecondary(job_order[shortname(t["id"])])
207
208 def upload_job_order(arvrunner, name, tool, job_order):
209     """Upload local files referenced in the input object and return updated input
210     object with 'location' updated to the proper keep references.
211     """
212
213     discover_secondary_files(tool.tool["inputs"], job_order)
214
215     jobmapper = upload_dependencies(arvrunner,
216                                     name,
217                                     tool.doc_loader,
218                                     job_order,
219                                     job_order.get("id", "#"),
220                                     False)
221
222     if "id" in job_order:
223         del job_order["id"]
224
225     # Need to filter this out, gets added by cwltool when providing
226     # parameters on the command line.
227     if "job_order" in job_order:
228         del job_order["job_order"]
229
230     return job_order
231
232 def upload_workflow_deps(arvrunner, tool, override_tools):
233     # Ensure that Docker images needed by this workflow are available
234
235     upload_docker(arvrunner, tool)
236
237     document_loader = tool.doc_loader
238
239     def upload_tool_deps(deptool):
240         if "id" in deptool:
241             upload_dependencies(arvrunner,
242                                 "%s dependencies" % (shortname(deptool["id"])),
243                                 document_loader,
244                                 deptool,
245                                 deptool["id"],
246                                 False,
247                                 include_primary=False)
248             document_loader.idx[deptool["id"]] = deptool
249             override_tools[deptool["id"]] = json.dumps(deptool)
250
251     tool.visit(upload_tool_deps)
252
253 def arvados_jobs_image(arvrunner, img):
254     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
255
256     try:
257         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
258     except Exception as e:
259         raise Exception("Docker image %s is not available\n%s" % (img, e) )
260     return img
261
262 def upload_workflow_collection(arvrunner, name, packed):
263     collection = arvados.collection.Collection(api_client=arvrunner.api,
264                                                keep_client=arvrunner.keep_client,
265                                                num_retries=arvrunner.num_retries)
266     with collection.open("workflow.cwl", "w") as f:
267         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
268
269     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
270                ["name", "like", name+"%"]]
271     if arvrunner.project_uuid:
272         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
273     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
274
275     if exists["items"]:
276         logger.info("Using collection %s", exists["items"][0]["uuid"])
277     else:
278         collection.save_new(name=name,
279                             owner_uuid=arvrunner.project_uuid,
280                             ensure_unique_name=True,
281                             num_retries=arvrunner.num_retries)
282         logger.info("Uploaded to %s", collection.manifest_locator())
283
284     return collection.portable_data_hash()
285
286
287 class Runner(object):
288     """Base class for runner processes, which submit an instance of
289     arvados-cwl-runner and wait for the final result."""
290
291     def __init__(self, runner, tool, job_order, enable_reuse,
292                  output_name, output_tags, submit_runner_ram=0,
293                  name=None, on_error=None, submit_runner_image=None,
294                  intermediate_output_ttl=0):
295         self.arvrunner = runner
296         self.tool = tool
297         self.job_order = job_order
298         self.running = False
299         if enable_reuse:
300             # If reuse is permitted by command line arguments but
301             # disabled by the workflow itself, disable it.
302             reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
303             if reuse_req:
304                 enable_reuse = reuse_req["enableReuse"]
305         self.enable_reuse = enable_reuse
306         self.uuid = None
307         self.final_output = None
308         self.output_name = output_name
309         self.output_tags = output_tags
310         self.name = name
311         self.on_error = on_error
312         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
313         self.intermediate_output_ttl = intermediate_output_ttl
314
315         if submit_runner_ram:
316             self.submit_runner_ram = submit_runner_ram
317         else:
318             self.submit_runner_ram = 3000
319
320         if self.submit_runner_ram <= 0:
321             raise Exception("Value of --submit-runner-ram must be greater than zero")
322
323     def update_pipeline_component(self, record):
324         pass
325
326     def done(self, record):
327         """Base method for handling a completed runner."""
328
329         try:
330             if record["state"] == "Complete":
331                 if record.get("exit_code") is not None:
332                     if record["exit_code"] == 33:
333                         processStatus = "UnsupportedRequirement"
334                     elif record["exit_code"] == 0:
335                         processStatus = "success"
336                     else:
337                         processStatus = "permanentFail"
338                 else:
339                     processStatus = "success"
340             else:
341                 processStatus = "permanentFail"
342
343             outputs = {}
344
345             if processStatus == "permanentFail":
346                 logc = arvados.collection.CollectionReader(record["log"],
347                                                            api_client=self.arvrunner.api,
348                                                            keep_client=self.arvrunner.keep_client,
349                                                            num_retries=self.arvrunner.num_retries)
350                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
351
352             self.final_output = record["output"]
353             outc = arvados.collection.CollectionReader(self.final_output,
354                                                        api_client=self.arvrunner.api,
355                                                        keep_client=self.arvrunner.keep_client,
356                                                        num_retries=self.arvrunner.num_retries)
357             if "cwl.output.json" in outc:
358                 with outc.open("cwl.output.json") as f:
359                     if f.size() > 0:
360                         outputs = json.load(f)
361             def keepify(fileobj):
362                 path = fileobj["location"]
363                 if not path.startswith("keep:"):
364                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
365             adjustFileObjs(outputs, keepify)
366             adjustDirObjs(outputs, keepify)
367         except Exception as e:
368             logger.exception("[%s] While getting final output object: %s", self.name, e)
369             self.arvrunner.output_callback({}, "permanentFail")
370         else:
371             self.arvrunner.output_callback(outputs, processStatus)
372         finally:
373             if record["uuid"] in self.arvrunner.processes:
374                 del self.arvrunner.processes[record["uuid"]]