1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
7 from functools import partial
12 from StringIO import StringIO
14 from schema_salad.sourceline import SourceLine
16 import cwltool.draft2tool
17 from cwltool.draft2tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
26 import arvados.collection
27 import ruamel.yaml as yaml
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
34 logger = logging.getLogger('arvados.cwl-runner')
36 def trim_anonymous_location(obj):
37 """Remove 'location' field from File and Directory literals.
39 To make internal handling easier, literals are assigned a random id for
40 'location'. However, when writing the record back out, this can break
41 reproducibility. Since it is valid for literals not have a 'location'
46 if obj.get("location", "").startswith("_:"):
49 def find_defaults(d, op):
50 if isinstance(d, list):
53 elif isinstance(d, dict):
57 for i in d.itervalues():
60 def upload_dependencies(arvrunner, name, document_loader,
61 workflowobj, uri, loadref_run, include_primary=True):
62 """Upload the dependencies of the workflowobj document to Keep.
64 Returns a pathmapper object mapping local paths to keep references. Also
65 does an in-place update of references in "workflowobj".
67 Use scandeps to find $import, $include, $schemas, run, File and Directory
68 fields that represent external references.
70 If workflowobj has an "id" field, this will reload the document to ensure
71 it is scanning the raw document prior to preprocessing.
76 joined = document_loader.fetcher.urljoin(b, u)
77 defrg, _ = urlparse.urldefrag(joined)
78 if defrg not in loaded:
80 # Use fetch_text to get raw file (before preprocessing).
81 text = document_loader.fetch_text(defrg)
82 if isinstance(text, bytes):
83 textIO = StringIO(text.decode('utf-8'))
85 textIO = StringIO(text)
86 return yaml.safe_load(textIO)
91 loadref_fields = set(("$import", "run"))
93 loadref_fields = set(("$import",))
96 if "id" in workflowobj:
97 # Need raw file content (before preprocessing) to ensure
98 # that external references in $include and $mixin are captured.
99 scanobj = loadref("", workflowobj["id"])
101 sc = scandeps(uri, scanobj,
103 set(("$include", "$schemas", "location")),
104 loadref, urljoin=document_loader.fetcher.urljoin)
106 normalizeFilesDirs(sc)
108 if include_primary and "id" in workflowobj:
109 sc.append({"class": "File", "location": workflowobj["id"]})
111 if "$schemas" in workflowobj:
112 for s in workflowobj["$schemas"]:
113 sc.append({"class": "File", "location": s})
115 def capture_default(obj):
118 if "location" not in f and "path" in f:
119 f["location"] = f["path"]
121 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
123 sc[:] = [x for x in sc if x["location"] != f["location"]]
124 # Delete "default" from workflowobj
126 visit_class(obj["default"], ("File", "Directory"), add_default)
130 find_defaults(workflowobj, capture_default)
132 mapper = ArvPathMapper(arvrunner, sc, "",
136 single_collection=True)
139 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
140 p["location"] = mapper.mapper(p["location"]).resolved
141 adjustFileObjs(workflowobj, setloc)
142 adjustDirObjs(workflowobj, setloc)
144 if "$schemas" in workflowobj:
146 for s in workflowobj["$schemas"]:
147 sch.append(mapper.mapper(s).resolved)
148 workflowobj["$schemas"] = sch
153 def upload_docker(arvrunner, tool):
154 """Uploads Docker images used in CommandLineTool objects."""
156 if isinstance(tool, CommandLineTool):
157 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
159 if docker_req.get("dockerOutputDirectory"):
160 # TODO: can be supported by containers API, but not jobs API.
161 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
162 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
163 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
165 arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
166 elif isinstance(tool, cwltool.workflow.Workflow):
168 upload_docker(arvrunner, s.embedded_tool)
170 def packed_workflow(arvrunner, tool):
171 """Create a packed workflow.
173 A "packed" workflow is one where all the components have been combined into a single document."""
175 return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
176 tool.tool["id"], tool.metadata)
178 def tag_git_version(packed):
179 if tool.tool["id"].startswith("file://"):
180 path = os.path.dirname(tool.tool["id"][7:])
182 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
183 except (OSError, subprocess.CalledProcessError):
186 packed["http://schema.org/version"] = githash
189 def upload_job_order(arvrunner, name, tool, job_order):
190 """Upload local files referenced in the input object and return updated input
191 object with 'location' updated to the proper keep references.
194 for t in tool.tool["inputs"]:
195 def setSecondary(fileobj):
196 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
197 if "secondaryFiles" not in fileobj:
198 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
200 if isinstance(fileobj, list):
204 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
205 setSecondary(job_order[shortname(t["id"])])
207 jobmapper = upload_dependencies(arvrunner,
211 job_order.get("id", "#"),
214 if "id" in job_order:
217 # Need to filter this out, gets added by cwltool when providing
218 # parameters on the command line.
219 if "job_order" in job_order:
220 del job_order["job_order"]
224 def upload_workflow_deps(arvrunner, tool, override_tools):
225 # Ensure that Docker images needed by this workflow are available
227 upload_docker(arvrunner, tool)
229 document_loader = tool.doc_loader
231 def upload_tool_deps(deptool):
233 upload_dependencies(arvrunner,
234 "%s dependencies" % (shortname(deptool["id"])),
239 include_primary=False)
240 document_loader.idx[deptool["id"]] = deptool
241 override_tools[deptool["id"]] = json.dumps(deptool)
243 tool.visit(upload_tool_deps)
245 def arvados_jobs_image(arvrunner, img):
246 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
249 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
250 except Exception as e:
251 raise Exception("Docker image %s is not available\n%s" % (img, e) )
254 def upload_workflow_collection(arvrunner, name, packed):
255 collection = arvados.collection.Collection(api_client=arvrunner.api,
256 keep_client=arvrunner.keep_client,
257 num_retries=arvrunner.num_retries)
258 with collection.open("workflow.cwl", "w") as f:
259 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
261 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
262 ["name", "like", name+"%"]]
263 if arvrunner.project_uuid:
264 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
265 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
268 logger.info("Using collection %s", exists["items"][0]["uuid"])
270 collection.save_new(name=name,
271 owner_uuid=arvrunner.project_uuid,
272 ensure_unique_name=True,
273 num_retries=arvrunner.num_retries)
274 logger.info("Uploaded to %s", collection.manifest_locator())
276 return collection.portable_data_hash()
279 class Runner(object):
280 """Base class for runner processes, which submit an instance of
281 arvados-cwl-runner and wait for the final result."""
283 def __init__(self, runner, tool, job_order, enable_reuse,
284 output_name, output_tags, submit_runner_ram=0,
285 name=None, on_error=None, submit_runner_image=None,
286 intermediate_output_ttl=0):
287 self.arvrunner = runner
289 self.job_order = job_order
291 self.enable_reuse = enable_reuse
293 self.final_output = None
294 self.output_name = output_name
295 self.output_tags = output_tags
297 self.on_error = on_error
298 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
299 self.intermediate_output_ttl = intermediate_output_ttl
301 if submit_runner_ram:
302 self.submit_runner_ram = submit_runner_ram
304 self.submit_runner_ram = 3000
306 if self.submit_runner_ram <= 0:
307 raise Exception("Value of --submit-runner-ram must be greater than zero")
309 def update_pipeline_component(self, record):
312 def done(self, record):
313 """Base method for handling a completed runner."""
316 if record["state"] == "Complete":
317 if record.get("exit_code") is not None:
318 if record["exit_code"] == 33:
319 processStatus = "UnsupportedRequirement"
320 elif record["exit_code"] == 0:
321 processStatus = "success"
323 processStatus = "permanentFail"
325 processStatus = "success"
327 processStatus = "permanentFail"
331 if processStatus == "permanentFail":
332 logc = arvados.collection.CollectionReader(record["log"],
333 api_client=self.arvrunner.api,
334 keep_client=self.arvrunner.keep_client,
335 num_retries=self.arvrunner.num_retries)
336 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
338 self.final_output = record["output"]
339 outc = arvados.collection.CollectionReader(self.final_output,
340 api_client=self.arvrunner.api,
341 keep_client=self.arvrunner.keep_client,
342 num_retries=self.arvrunner.num_retries)
343 if "cwl.output.json" in outc:
344 with outc.open("cwl.output.json") as f:
346 outputs = json.load(f)
347 def keepify(fileobj):
348 path = fileobj["location"]
349 if not path.startswith("keep:"):
350 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
351 adjustFileObjs(outputs, keepify)
352 adjustDirObjs(outputs, keepify)
353 except Exception as e:
354 logger.exception("[%s] While getting final output object: %s", self.name, e)
355 self.arvrunner.output_callback({}, "permanentFail")
357 self.arvrunner.output_callback(outputs, processStatus)
359 if record["uuid"] in self.arvrunner.processes:
360 del self.arvrunner.processes[record["uuid"]]