1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
7 from functools import partial
12 from StringIO import StringIO
14 from schema_salad.sourceline import SourceLine
16 import cwltool.draft2tool
17 from cwltool.draft2tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
26 import arvados.collection
27 import ruamel.yaml as yaml
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
34 logger = logging.getLogger('arvados.cwl-runner')
36 def trim_anonymous_location(obj):
37 """Remove 'location' field from File and Directory literals.
39 To make internal handling easier, literals are assigned a random id for
40 'location'. However, when writing the record back out, this can break
41 reproducibility. Since it is valid for literals not have a 'location'
46 if obj.get("location", "").startswith("_:"):
49 def find_defaults(d, op):
50 if isinstance(d, list):
53 if isinstance(d, dict):
57 for i in d.itervalues():
60 def upload_dependencies(arvrunner, name, document_loader,
61 workflowobj, uri, loadref_run, include_primary=True):
62 """Upload the dependencies of the workflowobj document to Keep.
64 Returns a pathmapper object mapping local paths to keep references. Also
65 does an in-place update of references in "workflowobj".
67 Use scandeps to find $import, $include, $schemas, run, File and Directory
68 fields that represent external references.
70 If workflowobj has an "id" field, this will reload the document to ensure
71 it is scanning the raw document prior to preprocessing.
76 joined = document_loader.fetcher.urljoin(b, u)
77 defrg, _ = urlparse.urldefrag(joined)
78 if defrg not in loaded:
80 # Use fetch_text to get raw file (before preprocessing).
81 text = document_loader.fetch_text(defrg)
82 if isinstance(text, bytes):
83 textIO = StringIO(text.decode('utf-8'))
85 textIO = StringIO(text)
86 return yaml.safe_load(textIO)
91 loadref_fields = set(("$import", "run"))
93 loadref_fields = set(("$import",))
96 if "id" in workflowobj:
97 # Need raw file content (before preprocessing) to ensure
98 # that external references in $include and $mixin are captured.
99 scanobj = loadref("", workflowobj["id"])
101 sc = scandeps(uri, scanobj,
103 set(("$include", "$schemas", "location")),
104 loadref, urljoin=document_loader.fetcher.urljoin)
106 normalizeFilesDirs(sc)
108 if include_primary and "id" in workflowobj:
109 sc.append({"class": "File", "location": workflowobj["id"]})
111 if "$schemas" in workflowobj:
112 for s in workflowobj["$schemas"]:
113 sc.append({"class": "File", "location": s})
115 def capture_default(obj):
118 if "location" not in f and "path" in f:
119 f["location"] = f["path"]
121 if not arvrunner.fs_access.exists(f["location"]):
125 if sc[i]["location"] == f["location"]:
129 # Delete "default" from workflowobj
131 visit_class(obj["default"], ("File", "Directory"), add_default)
135 find_defaults(workflowobj, capture_default)
137 mapper = ArvPathMapper(arvrunner, sc, "",
141 single_collection=True)
144 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
145 p["location"] = mapper.mapper(p["location"]).resolved
146 adjustFileObjs(workflowobj, setloc)
147 adjustDirObjs(workflowobj, setloc)
149 if "$schemas" in workflowobj:
151 for s in workflowobj["$schemas"]:
152 sch.append(mapper.mapper(s).resolved)
153 workflowobj["$schemas"] = sch
158 def upload_docker(arvrunner, tool):
159 """Uploads Docker images used in CommandLineTool objects."""
161 if isinstance(tool, CommandLineTool):
162 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
164 if docker_req.get("dockerOutputDirectory"):
165 # TODO: can be supported by containers API, but not jobs API.
166 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
167 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
168 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
169 elif isinstance(tool, cwltool.workflow.Workflow):
171 upload_docker(arvrunner, s.embedded_tool)
173 def packed_workflow(arvrunner, tool):
174 """Create a packed workflow.
176 A "packed" workflow is one where all the components have been combined into a single document."""
178 return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
179 tool.tool["id"], tool.metadata)
181 def tag_git_version(packed):
182 if tool.tool["id"].startswith("file://"):
183 path = os.path.dirname(tool.tool["id"][7:])
185 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
186 except (OSError, subprocess.CalledProcessError):
189 packed["http://schema.org/version"] = githash
192 def upload_job_order(arvrunner, name, tool, job_order):
193 """Upload local files referenced in the input object and return updated input
194 object with 'location' updated to the proper keep references.
197 for t in tool.tool["inputs"]:
198 def setSecondary(fileobj):
199 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
200 if "secondaryFiles" not in fileobj:
201 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
203 if isinstance(fileobj, list):
207 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
208 setSecondary(job_order[shortname(t["id"])])
210 jobmapper = upload_dependencies(arvrunner,
214 job_order.get("id", "#"),
217 if "id" in job_order:
220 # Need to filter this out, gets added by cwltool when providing
221 # parameters on the command line.
222 if "job_order" in job_order:
223 del job_order["job_order"]
227 def upload_workflow_deps(arvrunner, tool, override_tools):
228 # Ensure that Docker images needed by this workflow are available
230 upload_docker(arvrunner, tool)
232 document_loader = tool.doc_loader
234 def upload_tool_deps(deptool):
236 upload_dependencies(arvrunner,
237 "%s dependencies" % (shortname(deptool["id"])),
242 include_primary=False)
243 document_loader.idx[deptool["id"]] = deptool
244 override_tools[deptool["id"]] = json.dumps(deptool)
246 tool.visit(upload_tool_deps)
248 def arvados_jobs_image(arvrunner, img):
249 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
252 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
253 except Exception as e:
254 raise Exception("Docker image %s is not available\n%s" % (img, e) )
257 def upload_workflow_collection(arvrunner, name, packed):
258 collection = arvados.collection.Collection(api_client=arvrunner.api,
259 keep_client=arvrunner.keep_client,
260 num_retries=arvrunner.num_retries)
261 with collection.open("workflow.cwl", "w") as f:
262 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
264 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
265 ["name", "like", name+"%"]]
266 if arvrunner.project_uuid:
267 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
268 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
271 logger.info("Using collection %s", exists["items"][0]["uuid"])
273 collection.save_new(name=name,
274 owner_uuid=arvrunner.project_uuid,
275 ensure_unique_name=True,
276 num_retries=arvrunner.num_retries)
277 logger.info("Uploaded to %s", collection.manifest_locator())
279 return collection.portable_data_hash()
282 class Runner(object):
283 """Base class for runner processes, which submit an instance of
284 arvados-cwl-runner and wait for the final result."""
286 def __init__(self, runner, tool, job_order, enable_reuse,
287 output_name, output_tags, submit_runner_ram=0,
288 name=None, on_error=None, submit_runner_image=None,
289 intermediate_output_ttl=0):
290 self.arvrunner = runner
292 self.job_order = job_order
294 self.enable_reuse = enable_reuse
296 self.final_output = None
297 self.output_name = output_name
298 self.output_tags = output_tags
300 self.on_error = on_error
301 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
302 self.intermediate_output_ttl = intermediate_output_ttl
304 if submit_runner_ram:
305 self.submit_runner_ram = submit_runner_ram
307 self.submit_runner_ram = 3000
309 if self.submit_runner_ram <= 0:
310 raise Exception("Value of --submit-runner-ram must be greater than zero")
312 def update_pipeline_component(self, record):
315 def done(self, record):
316 """Base method for handling a completed runner."""
319 if record["state"] == "Complete":
320 if record.get("exit_code") is not None:
321 if record["exit_code"] == 33:
322 processStatus = "UnsupportedRequirement"
323 elif record["exit_code"] == 0:
324 processStatus = "success"
326 processStatus = "permanentFail"
328 processStatus = "success"
330 processStatus = "permanentFail"
334 if processStatus == "permanentFail":
335 logc = arvados.collection.CollectionReader(record["log"],
336 api_client=self.arvrunner.api,
337 keep_client=self.arvrunner.keep_client,
338 num_retries=self.arvrunner.num_retries)
339 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
341 self.final_output = record["output"]
342 outc = arvados.collection.CollectionReader(self.final_output,
343 api_client=self.arvrunner.api,
344 keep_client=self.arvrunner.keep_client,
345 num_retries=self.arvrunner.num_retries)
346 if "cwl.output.json" in outc:
347 with outc.open("cwl.output.json") as f:
349 outputs = json.load(f)
350 def keepify(fileobj):
351 path = fileobj["location"]
352 if not path.startswith("keep:"):
353 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
354 adjustFileObjs(outputs, keepify)
355 adjustDirObjs(outputs, keepify)
356 except Exception as e:
357 logger.exception("[%s] While getting final output object: %s", self.name, e)
358 self.arvrunner.output_callback({}, "permanentFail")
360 self.arvrunner.output_callback(outputs, processStatus)
362 if record["uuid"] in self.arvrunner.processes:
363 del self.arvrunner.processes[record["uuid"]]