1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
7 from functools import partial
12 from StringIO import StringIO
14 from schema_salad.sourceline import SourceLine
16 import cwltool.draft2tool
17 from cwltool.draft2tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
26 import arvados.collection
27 import ruamel.yaml as yaml
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
34 logger = logging.getLogger('arvados.cwl-runner')
36 def trim_anonymous_location(obj):
37 """Remove 'location' field from File and Directory literals.
39 To make internal handling easier, literals are assigned a random id for
40 'location'. However, when writing the record back out, this can break
41 reproducibility. Since it is valid for literals not have a 'location'
46 if obj.get("location", "").startswith("_:"):
49 def upload_dependencies(arvrunner, name, document_loader,
50 workflowobj, uri, loadref_run, include_primary=True):
51 """Upload the dependencies of the workflowobj document to Keep.
53 Returns a pathmapper object mapping local paths to keep references. Also
54 does an in-place update of references in "workflowobj".
56 Use scandeps to find $import, $include, $schemas, run, File and Directory
57 fields that represent external references.
59 If workflowobj has an "id" field, this will reload the document to ensure
60 it is scanning the raw document prior to preprocessing.
65 joined = document_loader.fetcher.urljoin(b, u)
66 defrg, _ = urlparse.urldefrag(joined)
67 if defrg not in loaded:
69 # Use fetch_text to get raw file (before preprocessing).
70 text = document_loader.fetch_text(defrg)
71 if isinstance(text, bytes):
72 textIO = StringIO(text.decode('utf-8'))
74 textIO = StringIO(text)
75 return yaml.safe_load(textIO)
80 loadref_fields = set(("$import", "run"))
82 loadref_fields = set(("$import",))
85 if "id" in workflowobj:
86 # Need raw file content (before preprocessing) to ensure
87 # that external references in $include and $mixin are captured.
88 scanobj = loadref("", workflowobj["id"])
90 sc = scandeps(uri, scanobj,
92 set(("$include", "$schemas", "location")),
93 loadref, urljoin=document_loader.fetcher.urljoin)
95 normalizeFilesDirs(sc)
97 if include_primary and "id" in workflowobj:
98 sc.append({"class": "File", "location": workflowobj["id"]})
100 if "$schemas" in workflowobj:
101 for s in workflowobj["$schemas"]:
102 sc.append({"class": "File", "location": s})
104 mapper = ArvPathMapper(arvrunner, sc, "",
108 single_collection=True)
111 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
112 p["location"] = mapper.mapper(p["location"]).resolved
113 adjustFileObjs(workflowobj, setloc)
114 adjustDirObjs(workflowobj, setloc)
116 if "$schemas" in workflowobj:
118 for s in workflowobj["$schemas"]:
119 sch.append(mapper.mapper(s).resolved)
120 workflowobj["$schemas"] = sch
125 def upload_docker(arvrunner, tool):
126 """Uploads Docker images used in CommandLineTool objects."""
128 if isinstance(tool, CommandLineTool):
129 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
131 if docker_req.get("dockerOutputDirectory"):
132 # TODO: can be supported by containers API, but not jobs API.
133 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
134 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
135 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
136 elif isinstance(tool, cwltool.workflow.Workflow):
138 upload_docker(arvrunner, s.embedded_tool)
140 def packed_workflow(arvrunner, tool):
141 """Create a packed workflow.
143 A "packed" workflow is one where all the components have been combined into a single document."""
145 return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
146 tool.tool["id"], tool.metadata)
148 def tag_git_version(packed):
149 if tool.tool["id"].startswith("file://"):
150 path = os.path.dirname(tool.tool["id"][7:])
152 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
153 except (OSError, subprocess.CalledProcessError):
156 packed["http://schema.org/version"] = githash
159 def upload_job_order(arvrunner, name, tool, job_order):
160 """Upload local files referenced in the input object and return updated input
161 object with 'location' updated to the proper keep references.
164 for t in tool.tool["inputs"]:
165 def setSecondary(fileobj):
166 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
167 if "secondaryFiles" not in fileobj:
168 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
170 if isinstance(fileobj, list):
174 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
175 setSecondary(job_order[shortname(t["id"])])
177 jobmapper = upload_dependencies(arvrunner,
181 job_order.get("id", "#"),
184 if "id" in job_order:
187 # Need to filter this out, gets added by cwltool when providing
188 # parameters on the command line.
189 if "job_order" in job_order:
190 del job_order["job_order"]
194 def upload_workflow_deps(arvrunner, tool, override_tools):
195 # Ensure that Docker images needed by this workflow are available
197 upload_docker(arvrunner, tool)
199 document_loader = tool.doc_loader
201 def upload_tool_deps(deptool):
203 upload_dependencies(arvrunner,
204 "%s dependencies" % (shortname(deptool["id"])),
209 include_primary=False)
210 document_loader.idx[deptool["id"]] = deptool
211 override_tools[deptool["id"]] = json.dumps(deptool)
213 tool.visit(upload_tool_deps)
215 def arvados_jobs_image(arvrunner, img):
216 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
219 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
220 except Exception as e:
221 raise Exception("Docker image %s is not available\n%s" % (img, e) )
224 def upload_workflow_collection(arvrunner, name, packed):
225 collection = arvados.collection.Collection(api_client=arvrunner.api,
226 keep_client=arvrunner.keep_client,
227 num_retries=arvrunner.num_retries)
228 with collection.open("workflow.cwl", "w") as f:
229 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
231 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
232 ["name", "like", name+"%"]]
233 if arvrunner.project_uuid:
234 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
235 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
238 logger.info("Using collection %s", exists["items"][0]["uuid"])
240 collection.save_new(name=name,
241 owner_uuid=arvrunner.project_uuid,
242 ensure_unique_name=True,
243 num_retries=arvrunner.num_retries)
244 logger.info("Uploaded to %s", collection.manifest_locator())
246 return collection.portable_data_hash()
249 class Runner(object):
250 """Base class for runner processes, which submit an instance of
251 arvados-cwl-runner and wait for the final result."""
253 def __init__(self, runner, tool, job_order, enable_reuse,
254 output_name, output_tags, submit_runner_ram=0,
255 name=None, on_error=None, submit_runner_image=None,
256 intermediate_output_ttl=0):
257 self.arvrunner = runner
259 self.job_order = job_order
261 self.enable_reuse = enable_reuse
263 self.final_output = None
264 self.output_name = output_name
265 self.output_tags = output_tags
267 self.on_error = on_error
268 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
269 self.intermediate_output_ttl = intermediate_output_ttl
271 if submit_runner_ram:
272 self.submit_runner_ram = submit_runner_ram
274 self.submit_runner_ram = 3000
276 if self.submit_runner_ram <= 0:
277 raise Exception("Value of --submit-runner-ram must be greater than zero")
279 def update_pipeline_component(self, record):
282 def done(self, record):
283 """Base method for handling a completed runner."""
286 if record["state"] == "Complete":
287 if record.get("exit_code") is not None:
288 if record["exit_code"] == 33:
289 processStatus = "UnsupportedRequirement"
290 elif record["exit_code"] == 0:
291 processStatus = "success"
293 processStatus = "permanentFail"
295 processStatus = "success"
297 processStatus = "permanentFail"
301 if processStatus == "permanentFail":
302 logc = arvados.collection.CollectionReader(record["log"],
303 api_client=self.arvrunner.api,
304 keep_client=self.arvrunner.keep_client,
305 num_retries=self.arvrunner.num_retries)
306 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
308 self.final_output = record["output"]
309 outc = arvados.collection.CollectionReader(self.final_output,
310 api_client=self.arvrunner.api,
311 keep_client=self.arvrunner.keep_client,
312 num_retries=self.arvrunner.num_retries)
313 if "cwl.output.json" in outc:
314 with outc.open("cwl.output.json") as f:
316 outputs = json.load(f)
317 def keepify(fileobj):
318 path = fileobj["location"]
319 if not path.startswith("keep:"):
320 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
321 adjustFileObjs(outputs, keepify)
322 adjustDirObjs(outputs, keepify)
323 except Exception as e:
324 logger.exception("[%s] While getting final output object: %s", self.name, e)
325 self.arvrunner.output_callback({}, "permanentFail")
327 self.arvrunner.output_callback(outputs, processStatus)
329 if record["uuid"] in self.arvrunner.processes:
330 del self.arvrunner.processes[record["uuid"]]