1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
7 from functools import partial
12 from StringIO import StringIO
14 from schema_salad.sourceline import SourceLine
16 import cwltool.draft2tool
17 from cwltool.draft2tool import CommandLineTool
18 import cwltool.workflow
19 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
20 from cwltool.load_tool import fetch_document
21 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
22 from cwltool.utils import aslist
23 from cwltool.builder import substitute
24 from cwltool.pack import pack
26 import arvados.collection
27 import ruamel.yaml as yaml
29 from .arvdocker import arv_docker_get_image
30 from .pathmapper import ArvPathMapper, trim_listing
31 from ._version import __version__
34 logger = logging.getLogger('arvados.cwl-runner')
36 def trim_anonymous_location(obj):
37 """Remove 'location' field from File and Directory literals.
39 To make internal handling easier, literals are assigned a random id for
40 'location'. However, when writing the record back out, this can break
41 reproducibility. Since it is valid for literals not have a 'location'
46 if obj.get("location", "").startswith("_:"):
49 def remove_redundant_fields(obj):
50 for field in ("path", "nameext", "nameroot", "dirname"):
54 def find_defaults(d, op):
55 if isinstance(d, list):
58 elif isinstance(d, dict):
62 for i in d.itervalues():
65 def upload_dependencies(arvrunner, name, document_loader,
66 workflowobj, uri, loadref_run, include_primary=True):
67 """Upload the dependencies of the workflowobj document to Keep.
69 Returns a pathmapper object mapping local paths to keep references. Also
70 does an in-place update of references in "workflowobj".
72 Use scandeps to find $import, $include, $schemas, run, File and Directory
73 fields that represent external references.
75 If workflowobj has an "id" field, this will reload the document to ensure
76 it is scanning the raw document prior to preprocessing.
81 joined = document_loader.fetcher.urljoin(b, u)
82 defrg, _ = urlparse.urldefrag(joined)
83 if defrg not in loaded:
85 # Use fetch_text to get raw file (before preprocessing).
86 text = document_loader.fetch_text(defrg)
87 if isinstance(text, bytes):
88 textIO = StringIO(text.decode('utf-8'))
90 textIO = StringIO(text)
91 return yaml.safe_load(textIO)
96 loadref_fields = set(("$import", "run"))
98 loadref_fields = set(("$import",))
100 scanobj = workflowobj
101 if "id" in workflowobj:
102 # Need raw file content (before preprocessing) to ensure
103 # that external references in $include and $mixin are captured.
104 scanobj = loadref("", workflowobj["id"])
106 sc = scandeps(uri, scanobj,
108 set(("$include", "$schemas", "location")),
109 loadref, urljoin=document_loader.fetcher.urljoin)
111 normalizeFilesDirs(sc)
113 if include_primary and "id" in workflowobj:
114 sc.append({"class": "File", "location": workflowobj["id"]})
116 if "$schemas" in workflowobj:
117 for s in workflowobj["$schemas"]:
118 sc.append({"class": "File", "location": s})
120 def capture_default(obj):
123 if "location" not in f and "path" in f:
124 f["location"] = f["path"]
126 if "location" in f and not arvrunner.fs_access.exists(f["location"]):
128 sc[:] = [x for x in sc if x["location"] != f["location"]]
129 # Delete "default" from workflowobj
131 visit_class(obj["default"], ("File", "Directory"), add_default)
135 find_defaults(workflowobj, capture_default)
137 mapper = ArvPathMapper(arvrunner, sc, "",
141 single_collection=True)
144 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
145 p["location"] = mapper.mapper(p["location"]).resolved
146 adjustFileObjs(workflowobj, setloc)
147 adjustDirObjs(workflowobj, setloc)
149 if "$schemas" in workflowobj:
151 for s in workflowobj["$schemas"]:
152 sch.append(mapper.mapper(s).resolved)
153 workflowobj["$schemas"] = sch
158 def upload_docker(arvrunner, tool):
159 """Uploads Docker images used in CommandLineTool objects."""
161 if isinstance(tool, CommandLineTool):
162 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
164 if docker_req.get("dockerOutputDirectory"):
165 # TODO: can be supported by containers API, but not jobs API.
166 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
167 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
168 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
170 arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
171 elif isinstance(tool, cwltool.workflow.Workflow):
173 upload_docker(arvrunner, s.embedded_tool)
175 def packed_workflow(arvrunner, tool):
176 """Create a packed workflow.
178 A "packed" workflow is one where all the components have been combined into a single document."""
180 return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
181 tool.tool["id"], tool.metadata)
183 def tag_git_version(packed):
184 if tool.tool["id"].startswith("file://"):
185 path = os.path.dirname(tool.tool["id"][7:])
187 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
188 except (OSError, subprocess.CalledProcessError):
191 packed["http://schema.org/version"] = githash
194 def discover_secondary_files(inputs, job_order):
196 def setSecondary(fileobj):
197 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
198 if "secondaryFiles" not in fileobj:
199 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
201 if isinstance(fileobj, list):
205 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
206 setSecondary(job_order[shortname(t["id"])])
208 def upload_job_order(arvrunner, name, tool, job_order):
209 """Upload local files referenced in the input object and return updated input
210 object with 'location' updated to the proper keep references.
213 discover_secondary_files(tool.tool["inputs"], job_order)
215 jobmapper = upload_dependencies(arvrunner,
219 job_order.get("id", "#"),
222 if "id" in job_order:
225 # Need to filter this out, gets added by cwltool when providing
226 # parameters on the command line.
227 if "job_order" in job_order:
228 del job_order["job_order"]
232 def upload_workflow_deps(arvrunner, tool, override_tools):
233 # Ensure that Docker images needed by this workflow are available
235 upload_docker(arvrunner, tool)
237 document_loader = tool.doc_loader
239 def upload_tool_deps(deptool):
241 upload_dependencies(arvrunner,
242 "%s dependencies" % (shortname(deptool["id"])),
247 include_primary=False)
248 document_loader.idx[deptool["id"]] = deptool
249 override_tools[deptool["id"]] = json.dumps(deptool)
251 tool.visit(upload_tool_deps)
253 def arvados_jobs_image(arvrunner, img):
254 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
257 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
258 except Exception as e:
259 raise Exception("Docker image %s is not available\n%s" % (img, e) )
262 def upload_workflow_collection(arvrunner, name, packed):
263 collection = arvados.collection.Collection(api_client=arvrunner.api,
264 keep_client=arvrunner.keep_client,
265 num_retries=arvrunner.num_retries)
266 with collection.open("workflow.cwl", "w") as f:
267 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
269 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
270 ["name", "like", name+"%"]]
271 if arvrunner.project_uuid:
272 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
273 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
276 logger.info("Using collection %s", exists["items"][0]["uuid"])
278 collection.save_new(name=name,
279 owner_uuid=arvrunner.project_uuid,
280 ensure_unique_name=True,
281 num_retries=arvrunner.num_retries)
282 logger.info("Uploaded to %s", collection.manifest_locator())
284 return collection.portable_data_hash()
287 class Runner(object):
288 """Base class for runner processes, which submit an instance of
289 arvados-cwl-runner and wait for the final result."""
291 def __init__(self, runner, tool, job_order, enable_reuse,
292 output_name, output_tags, submit_runner_ram=0,
293 name=None, on_error=None, submit_runner_image=None,
294 intermediate_output_ttl=0):
295 self.arvrunner = runner
297 self.job_order = job_order
300 # If reuse is permitted by command line arguments but
301 # disabled by the workflow itself, disable it.
302 reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
304 enable_reuse = reuse_req["enableReuse"]
305 self.enable_reuse = enable_reuse
307 self.final_output = None
308 self.output_name = output_name
309 self.output_tags = output_tags
311 self.on_error = on_error
312 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
313 self.intermediate_output_ttl = intermediate_output_ttl
315 if submit_runner_ram:
316 self.submit_runner_ram = submit_runner_ram
318 self.submit_runner_ram = 3000
320 if self.submit_runner_ram <= 0:
321 raise Exception("Value of --submit-runner-ram must be greater than zero")
323 def update_pipeline_component(self, record):
326 def done(self, record):
327 """Base method for handling a completed runner."""
330 if record["state"] == "Complete":
331 if record.get("exit_code") is not None:
332 if record["exit_code"] == 33:
333 processStatus = "UnsupportedRequirement"
334 elif record["exit_code"] == 0:
335 processStatus = "success"
337 processStatus = "permanentFail"
339 processStatus = "success"
341 processStatus = "permanentFail"
345 if processStatus == "permanentFail":
346 logc = arvados.collection.CollectionReader(record["log"],
347 api_client=self.arvrunner.api,
348 keep_client=self.arvrunner.keep_client,
349 num_retries=self.arvrunner.num_retries)
350 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
352 self.final_output = record["output"]
353 outc = arvados.collection.CollectionReader(self.final_output,
354 api_client=self.arvrunner.api,
355 keep_client=self.arvrunner.keep_client,
356 num_retries=self.arvrunner.num_retries)
357 if "cwl.output.json" in outc:
358 with outc.open("cwl.output.json") as f:
360 outputs = json.load(f)
361 def keepify(fileobj):
362 path = fileobj["location"]
363 if not path.startswith("keep:"):
364 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
365 adjustFileObjs(outputs, keepify)
366 adjustDirObjs(outputs, keepify)
367 except Exception as e:
368 logger.exception("[%s] While getting final output object: %s", self.name, e)
369 self.arvrunner.output_callback({}, "permanentFail")
371 self.arvrunner.output_callback(outputs, processStatus)
373 if record["uuid"] in self.arvrunner.processes:
374 del self.arvrunner.processes[record["uuid"]]