logger = logging.getLogger('arvados.cwl-runner')
-cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
+
+def trim_listing(obj):
+ """Remove 'listing' field from Directory objects that are keep references.
+
+ When Directory objects represent Keep references, it redundant and
+ potentially very expensive to pass fully enumerated Directory objects
+ between instances of cwl-runner (e.g. a submitting a job, or using the
+ RunInSingleContainer feature), so delete the 'listing' field when it is
+ safe to do so.
+ """
+
+ if obj.get("location", "").startswith("keep:") and "listing" in obj:
+ del obj["listing"]
+ if obj.get("location", "").startswith("_:"):
+ del obj["location"]
def upload_dependencies(arvrunner, name, document_loader,
workflowobj, uri, loadref_run):
+ """Upload the dependencies of the workflowobj document to Keep.
+
+ Returns a pathmapper object mapping local paths to keep references. Also
+ does an in-place update of references in "workflowobj".
+
+ Use scandeps to find $import, $include, $schemas, run, File and Directory
+ fields that represent external references.
+
+ If workflowobj has an "id" field, this will reload the document to ensure
+ it is scanning the raw document prior to preprocessing.
+ """
+
loaded = set()
def loadref(b, u):
joined = urlparse.urljoin(b, u)
sc = scandeps(uri, scanobj,
loadref_fields,
- set(("$include", "$schemas")),
+ set(("$include", "$schemas", "location")),
loadref)
- files = []
- def visitFiles(path):
- files.append(path)
-
- adjustFileObjs(sc, visitFiles)
- adjustDirObjs(sc, visitFiles)
-
- normalizeFilesDirs(files)
+ normalizeFilesDirs(sc)
if "id" in workflowobj:
- files.append({"class": "File", "location": workflowobj["id"]})
+ sc.append({"class": "File", "location": workflowobj["id"]})
- mapper = ArvPathMapper(arvrunner, files, "",
+ mapper = ArvPathMapper(arvrunner, sc, "",
"keep:%s",
"keep:%s/%s",
name=name)
def setloc(p):
- p["location"] = mapper.mapper(p["location"]).target
+ if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
+ p["location"] = mapper.mapper(p["location"]).resolved
adjustFileObjs(workflowobj, setloc)
adjustDirObjs(workflowobj, setloc)
self.job_order.get("id", "#"),
False)
+ adjustDirObjs(self.job_order, trim_listing)
+
if "id" in self.job_order:
del self.job_order["id"]