import logging
import json
import re
+from cStringIO import StringIO
import cwltool.draft2tool
from cwltool.draft2tool import CommandLineTool
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
import arvados.collection
+import ruamel.yaml as yaml
from .arvdocker import arv_docker_get_image
from .pathmapper import ArvPathMapper
logger = logging.getLogger('arvados.cwl-runner')
-cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
+
+def trim_listing(obj):
+ """Remove 'listing' field from Directory objects that are keep references.
+
+ When Directory objects represent Keep references, it redundant and
+ potentially very expensive to pass fully enumerated Directory objects
+ between instances of cwl-runner (e.g. a submitting a job, or using the
+ RunInSingleContainer feature), so delete the 'listing' field when it is
+ safe to do so.
+ """
+
+ if obj.get("location", "").startswith("keep:") and "listing" in obj:
+ del obj["listing"]
+ if obj.get("location", "").startswith("_:"):
+ del obj["location"]
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, keepprefix, loadref_run):
+ workflowobj, uri, loadref_run):
+ """Upload the dependencies of the workflowobj document to Keep.
+
+ Returns a pathmapper object mapping local paths to keep references. Also
+ does an in-place update of references in "workflowobj".
+
+ Use scandeps to find $import, $include, $schemas, run, File and Directory
+ fields that represent external references.
+
+ If workflowobj has an "id" field, this will reload the document to ensure
+ it is scanning the raw document prior to preprocessing.
+ """
+
loaded = set()
def loadref(b, u):
joined = urlparse.urljoin(b, u)
- if joined not in loaded:
- loaded.add(joined)
- return document_loader.fetch(urlparse.urljoin(b, u))
+ defrg, _ = urlparse.urldefrag(joined)
+ if defrg not in loaded:
+ loaded.add(defrg)
+ # Use fetch_text to get raw file (before preprocessing).
+ text = document_loader.fetch_text(defrg)
+ if isinstance(text, bytes):
+ textIO = StringIO(text.decode('utf-8'))
+ else:
+ textIO = StringIO(text)
+ return yaml.safe_load(textIO)
else:
return {}
else:
loadref_fields = set(("$import",))
- sc = scandeps(uri, workflowobj,
+ scanobj = workflowobj
+ if "id" in workflowobj:
+ # Need raw file content (before preprocessing) to ensure
+ # that external references in $include and $mixin are captured.
+ scanobj = loadref("", workflowobj["id"])
+
+ sc = scandeps(uri, scanobj,
loadref_fields,
- set(("$include", "$schemas", "path", "location")),
+ set(("$include", "$schemas", "location")),
loadref)
- files = []
- def visitFiles(path):
- files.append(path)
-
- adjustFileObjs(sc, visitFiles)
- adjustDirObjs(sc, visitFiles)
-
- normalizeFilesDirs(files)
+ normalizeFilesDirs(sc)
if "id" in workflowobj:
- files.append({"class": "File", "location": workflowobj["id"]})
+ sc.append({"class": "File", "location": workflowobj["id"]})
- mapper = ArvPathMapper(arvrunner, files, "",
- keepprefix+"%s",
- keepprefix+"%s/%s",
+ mapper = ArvPathMapper(arvrunner, sc, "",
+ "keep:%s",
+ "keep:%s/%s",
name=name)
def setloc(p):
- p["location"] = mapper.mapper(p["location"]).target
+ if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
+ p["location"] = mapper.mapper(p["location"]).resolved
adjustFileObjs(workflowobj, setloc)
adjustDirObjs(workflowobj, setloc)
class Runner(object):
- def __init__(self, runner, tool, job_order, enable_reuse):
+ def __init__(self, runner, tool, job_order, enable_reuse, output_name):
self.arvrunner = runner
self.tool = tool
self.job_order = job_order
self.running = False
self.enable_reuse = enable_reuse
self.uuid = None
+ self.final_output = None
+ self.output_name = output_name
def update_pipeline_component(self, record):
pass
self.tool.doc_loader,
self.tool.tool,
self.tool.tool["id"],
- kwargs.get("keepprefix", ""),
True)
jobmapper = upload_dependencies(self.arvrunner,
self.tool.doc_loader,
self.job_order,
self.job_order.get("id", "#"),
- kwargs.get("keepprefix", ""),
False)
+ adjustDirObjs(self.job_order, trim_listing)
+
if "id" in self.job_order:
del self.job_order["id"]
outputs = None
try:
try:
- outc = arvados.collection.Collection(record["output"])
+ self.final_output = record["output"]
+ outc = arvados.collection.CollectionReader(self.final_output,
+ api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
with outc.open("cwl.output.json") as f:
outputs = json.load(f)
def keepify(fileobj):