Merge branch '7475-nodemgr-unsatisfiable-job-comms'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import urlparse
7 from functools import partial
8 import logging
9 import json
10 import subprocess
11
12 from StringIO import StringIO
13
14 from schema_salad.sourceline import SourceLine
15
16 import cwltool.draft2tool
17 from cwltool.draft2tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
25
26 import arvados.collection
27 import ruamel.yaml as yaml
28
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
32 from . import done
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def trim_anonymous_location(obj):
37     """Remove 'location' field from File and Directory literals.
38
39     To make internal handling easier, literals are assigned a random id for
40     'location'.  However, when writing the record back out, this can break
41     reproducibility.  Since it is valid for literals not have a 'location'
42     field, remove it.
43
44     """
45
46     if obj.get("location", "").startswith("_:"):
47         del obj["location"]
48
49 def find_defaults(d, op):
50     if isinstance(d, list):
51         for i in d:
52             find_defaults(i, op)
53     elif isinstance(d, dict):
54         if "default" in d:
55             op(d)
56         else:
57             for i in d.itervalues():
58                 find_defaults(i, op)
59
60 def upload_dependencies(arvrunner, name, document_loader,
61                         workflowobj, uri, loadref_run, include_primary=True):
62     """Upload the dependencies of the workflowobj document to Keep.
63
64     Returns a pathmapper object mapping local paths to keep references.  Also
65     does an in-place update of references in "workflowobj".
66
67     Use scandeps to find $import, $include, $schemas, run, File and Directory
68     fields that represent external references.
69
70     If workflowobj has an "id" field, this will reload the document to ensure
71     it is scanning the raw document prior to preprocessing.
72     """
73
74     loaded = set()
75     def loadref(b, u):
76         joined = document_loader.fetcher.urljoin(b, u)
77         defrg, _ = urlparse.urldefrag(joined)
78         if defrg not in loaded:
79             loaded.add(defrg)
80             # Use fetch_text to get raw file (before preprocessing).
81             text = document_loader.fetch_text(defrg)
82             if isinstance(text, bytes):
83                 textIO = StringIO(text.decode('utf-8'))
84             else:
85                 textIO = StringIO(text)
86             return yaml.safe_load(textIO)
87         else:
88             return {}
89
90     if loadref_run:
91         loadref_fields = set(("$import", "run"))
92     else:
93         loadref_fields = set(("$import",))
94
95     scanobj = workflowobj
96     if "id" in workflowobj:
97         # Need raw file content (before preprocessing) to ensure
98         # that external references in $include and $mixin are captured.
99         scanobj = loadref("", workflowobj["id"])
100
101     sc = scandeps(uri, scanobj,
102                   loadref_fields,
103                   set(("$include", "$schemas", "location")),
104                   loadref, urljoin=document_loader.fetcher.urljoin)
105
106     normalizeFilesDirs(sc)
107
108     if include_primary and "id" in workflowobj:
109         sc.append({"class": "File", "location": workflowobj["id"]})
110
111     if "$schemas" in workflowobj:
112         for s in workflowobj["$schemas"]:
113             sc.append({"class": "File", "location": s})
114
115     def capture_default(obj):
116         remove = [False]
117         def add_default(f):
118             if "location" not in f and "path" in f:
119                 f["location"] = f["path"]
120                 del f["path"]
121             if "location" in f and not arvrunner.fs_access.exists(f["location"]):
122                 # Remove from sc
123                 sc[:] = [x for x in sc if x["location"] != f["location"]]
124                 # Delete "default" from workflowobj
125                 remove[0] = True
126         visit_class(obj["default"], ("File", "Directory"), add_default)
127         if remove[0]:
128             del obj["default"]
129
130     find_defaults(workflowobj, capture_default)
131
132     mapper = ArvPathMapper(arvrunner, sc, "",
133                            "keep:%s",
134                            "keep:%s/%s",
135                            name=name,
136                            single_collection=True)
137
138     def setloc(p):
139         if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
140             p["location"] = mapper.mapper(p["location"]).resolved
141     adjustFileObjs(workflowobj, setloc)
142     adjustDirObjs(workflowobj, setloc)
143
144     if "$schemas" in workflowobj:
145         sch = []
146         for s in workflowobj["$schemas"]:
147             sch.append(mapper.mapper(s).resolved)
148         workflowobj["$schemas"] = sch
149
150     return mapper
151
152
153 def upload_docker(arvrunner, tool):
154     """Uploads Docker images used in CommandLineTool objects."""
155
156     if isinstance(tool, CommandLineTool):
157         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
158         if docker_req:
159             if docker_req.get("dockerOutputDirectory"):
160                 # TODO: can be supported by containers API, but not jobs API.
161                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
162                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
163             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
164         else:
165             arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
166     elif isinstance(tool, cwltool.workflow.Workflow):
167         for s in tool.steps:
168             upload_docker(arvrunner, s.embedded_tool)
169
170 def packed_workflow(arvrunner, tool):
171     """Create a packed workflow.
172
173     A "packed" workflow is one where all the components have been combined into a single document."""
174
175     return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
176                 tool.tool["id"], tool.metadata)
177
178 def tag_git_version(packed):
179     if tool.tool["id"].startswith("file://"):
180         path = os.path.dirname(tool.tool["id"][7:])
181         try:
182             githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
183         except (OSError, subprocess.CalledProcessError):
184             pass
185         else:
186             packed["http://schema.org/version"] = githash
187
188
189 def upload_job_order(arvrunner, name, tool, job_order):
190     """Upload local files referenced in the input object and return updated input
191     object with 'location' updated to the proper keep references.
192     """
193
194     for t in tool.tool["inputs"]:
195         def setSecondary(fileobj):
196             if isinstance(fileobj, dict) and fileobj.get("class") == "File":
197                 if "secondaryFiles" not in fileobj:
198                     fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
199
200             if isinstance(fileobj, list):
201                 for e in fileobj:
202                     setSecondary(e)
203
204         if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
205             setSecondary(job_order[shortname(t["id"])])
206
207     jobmapper = upload_dependencies(arvrunner,
208                                     name,
209                                     tool.doc_loader,
210                                     job_order,
211                                     job_order.get("id", "#"),
212                                     False)
213
214     if "id" in job_order:
215         del job_order["id"]
216
217     # Need to filter this out, gets added by cwltool when providing
218     # parameters on the command line.
219     if "job_order" in job_order:
220         del job_order["job_order"]
221
222     return job_order
223
224 def upload_workflow_deps(arvrunner, tool, override_tools):
225     # Ensure that Docker images needed by this workflow are available
226
227     upload_docker(arvrunner, tool)
228
229     document_loader = tool.doc_loader
230
231     def upload_tool_deps(deptool):
232         if "id" in deptool:
233             upload_dependencies(arvrunner,
234                                 "%s dependencies" % (shortname(deptool["id"])),
235                                 document_loader,
236                                 deptool,
237                                 deptool["id"],
238                                 False,
239                                 include_primary=False)
240             document_loader.idx[deptool["id"]] = deptool
241             override_tools[deptool["id"]] = json.dumps(deptool)
242
243     tool.visit(upload_tool_deps)
244
245 def arvados_jobs_image(arvrunner, img):
246     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
247
248     try:
249         arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
250     except Exception as e:
251         raise Exception("Docker image %s is not available\n%s" % (img, e) )
252     return img
253
254 def upload_workflow_collection(arvrunner, name, packed):
255     collection = arvados.collection.Collection(api_client=arvrunner.api,
256                                                keep_client=arvrunner.keep_client,
257                                                num_retries=arvrunner.num_retries)
258     with collection.open("workflow.cwl", "w") as f:
259         f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
260
261     filters = [["portable_data_hash", "=", collection.portable_data_hash()],
262                ["name", "like", name+"%"]]
263     if arvrunner.project_uuid:
264         filters.append(["owner_uuid", "=", arvrunner.project_uuid])
265     exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
266
267     if exists["items"]:
268         logger.info("Using collection %s", exists["items"][0]["uuid"])
269     else:
270         collection.save_new(name=name,
271                             owner_uuid=arvrunner.project_uuid,
272                             ensure_unique_name=True,
273                             num_retries=arvrunner.num_retries)
274         logger.info("Uploaded to %s", collection.manifest_locator())
275
276     return collection.portable_data_hash()
277
278
279 class Runner(object):
280     """Base class for runner processes, which submit an instance of
281     arvados-cwl-runner and wait for the final result."""
282
283     def __init__(self, runner, tool, job_order, enable_reuse,
284                  output_name, output_tags, submit_runner_ram=0,
285                  name=None, on_error=None, submit_runner_image=None,
286                  intermediate_output_ttl=0):
287         self.arvrunner = runner
288         self.tool = tool
289         self.job_order = job_order
290         self.running = False
291         self.enable_reuse = enable_reuse
292         self.uuid = None
293         self.final_output = None
294         self.output_name = output_name
295         self.output_tags = output_tags
296         self.name = name
297         self.on_error = on_error
298         self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
299         self.intermediate_output_ttl = intermediate_output_ttl
300
301         if submit_runner_ram:
302             self.submit_runner_ram = submit_runner_ram
303         else:
304             self.submit_runner_ram = 3000
305
306         if self.submit_runner_ram <= 0:
307             raise Exception("Value of --submit-runner-ram must be greater than zero")
308
309     def update_pipeline_component(self, record):
310         pass
311
312     def done(self, record):
313         """Base method for handling a completed runner."""
314
315         try:
316             if record["state"] == "Complete":
317                 if record.get("exit_code") is not None:
318                     if record["exit_code"] == 33:
319                         processStatus = "UnsupportedRequirement"
320                     elif record["exit_code"] == 0:
321                         processStatus = "success"
322                     else:
323                         processStatus = "permanentFail"
324                 else:
325                     processStatus = "success"
326             else:
327                 processStatus = "permanentFail"
328
329             outputs = {}
330
331             if processStatus == "permanentFail":
332                 logc = arvados.collection.CollectionReader(record["log"],
333                                                            api_client=self.arvrunner.api,
334                                                            keep_client=self.arvrunner.keep_client,
335                                                            num_retries=self.arvrunner.num_retries)
336                 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
337
338             self.final_output = record["output"]
339             outc = arvados.collection.CollectionReader(self.final_output,
340                                                        api_client=self.arvrunner.api,
341                                                        keep_client=self.arvrunner.keep_client,
342                                                        num_retries=self.arvrunner.num_retries)
343             if "cwl.output.json" in outc:
344                 with outc.open("cwl.output.json") as f:
345                     if f.size() > 0:
346                         outputs = json.load(f)
347             def keepify(fileobj):
348                 path = fileobj["location"]
349                 if not path.startswith("keep:"):
350                     fileobj["location"] = "keep:%s/%s" % (record["output"], path)
351             adjustFileObjs(outputs, keepify)
352             adjustDirObjs(outputs, keepify)
353         except Exception as e:
354             logger.exception("[%s] While getting final output object: %s", self.name, e)
355             self.arvrunner.output_callback({}, "permanentFail")
356         else:
357             self.arvrunner.output_callback(outputs, processStatus)
358         finally:
359             if record["uuid"] in self.arvrunner.processes:
360                 del self.arvrunner.processes[record["uuid"]]