import logging
import json
import re
+from cStringIO import StringIO
import cwltool.draft2tool
from cwltool.draft2tool import CommandLineTool
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
import arvados.collection
+import ruamel.yaml as yaml
from .arvdocker import arv_docker_get_image
from .pathmapper import ArvPathMapper
cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, keepprefix, loadref_run):
+ workflowobj, uri, loadref_run):
+ """Upload the dependencies of the workflowobj document to Keep.
+
+ Returns a pathmapper object mapping local paths to keep references. Also
+ does an in-place update of references in "workflowobj".
+
+ Use scandeps to find $import, $include, $schemas, run, File and Directory
+ fields that represent external references.
+
+ If workflowobj has an "id" field, this will reload the document to ensure
+ it is scanning the raw document prior to preprocessing.
+ """
+
loaded = set()
def loadref(b, u):
joined = urlparse.urljoin(b, u)
- if joined not in loaded:
- loaded.add(joined)
- return document_loader.fetch(urlparse.urljoin(b, u))
+ defrg, _ = urlparse.urldefrag(joined)
+ if defrg not in loaded:
+ loaded.add(defrg)
+ # Use fetch_text to get raw file (before preprocessing).
+ text = document_loader.fetch_text(defrg)
+ if isinstance(text, bytes):
+ textIO = StringIO(text.decode('utf-8'))
+ else:
+ textIO = StringIO(text)
+ return yaml.safe_load(textIO)
else:
return {}
else:
loadref_fields = set(("$import",))
- sc = scandeps(uri, workflowobj,
+ scanobj = workflowobj
+ if "id" in workflowobj:
+ # Need raw file content (before preprocessing) to ensure
+ # that external references in $include and $mixin are captured.
+ scanobj = loadref("", workflowobj["id"])
+
+ sc = scandeps(uri, scanobj,
loadref_fields,
- set(("$include", "$schemas", "path", "location")),
+ set(("$include", "$schemas", "location")),
loadref)
files = []
files.append({"class": "File", "location": workflowobj["id"]})
mapper = ArvPathMapper(arvrunner, files, "",
- keepprefix+"%s",
- keepprefix+"%s/%s",
+ "keep:%s",
+ "keep:%s/%s",
name=name)
def setloc(p):
self.tool.doc_loader,
self.tool.tool,
self.tool.tool["id"],
- kwargs.get("keepprefix", ""),
True)
jobmapper = upload_dependencies(self.arvrunner,
self.tool.doc_loader,
self.job_order,
self.job_order.get("id", "#"),
- kwargs.get("keepprefix", ""),
False)
if "id" in self.job_order: