import logging
import json
import re
+from cStringIO import StringIO
import cwltool.draft2tool
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import get_feature, scandeps, UnsupportedRequirement
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
from cwltool.load_tool import fetch_document
-from cwltool.pathmapper import adjustFileObjs
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
import arvados.collection
+import ruamel.yaml as yaml
from .arvdocker import arv_docker_get_image
from .pathmapper import ArvPathMapper
logger = logging.getLogger('arvados.cwl-runner')
-cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
+
+def trim_listing(obj):
+ """Remove 'listing' field from Directory objects that are keep references.
+
+ When Directory objects represent Keep references, it redundant and
+ potentially very expensive to pass fully enumerated Directory objects
+ between instances of cwl-runner (e.g. a submitting a job, or using the
+ RunInSingleContainer feature), so delete the 'listing' field when it is
+ safe to do so.
+ """
+
+ if obj.get("location", "").startswith("keep:") and "listing" in obj:
+ del obj["listing"]
+ if obj.get("location", "").startswith("_:"):
+ del obj["location"]
+
+def upload_dependencies(arvrunner, name, document_loader,
+ workflowobj, uri, loadref_run):
+ """Upload the dependencies of the workflowobj document to Keep.
+
+ Returns a pathmapper object mapping local paths to keep references. Also
+ does an in-place update of references in "workflowobj".
+
+ Use scandeps to find $import, $include, $schemas, run, File and Directory
+ fields that represent external references.
+
+ If workflowobj has an "id" field, this will reload the document to ensure
+ it is scanning the raw document prior to preprocessing.
+ """
+
+ loaded = set()
+ def loadref(b, u):
+ joined = urlparse.urljoin(b, u)
+ defrg, _ = urlparse.urldefrag(joined)
+ if defrg not in loaded:
+ loaded.add(defrg)
+ # Use fetch_text to get raw file (before preprocessing).
+ text = document_loader.fetch_text(defrg)
+ if isinstance(text, bytes):
+ textIO = StringIO(text.decode('utf-8'))
+ else:
+ textIO = StringIO(text)
+ return yaml.safe_load(textIO)
+ else:
+ return {}
+
+ if loadref_run:
+ loadref_fields = set(("$import", "run"))
+ else:
+ loadref_fields = set(("$import",))
+
+ scanobj = workflowobj
+ if "id" in workflowobj:
+ # Need raw file content (before preprocessing) to ensure
+ # that external references in $include and $mixin are captured.
+ scanobj = loadref("", workflowobj["id"])
+
+ sc = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$include", "$schemas", "location")),
+ loadref)
+
+ normalizeFilesDirs(sc)
+
+ if "id" in workflowobj:
+ sc.append({"class": "File", "location": workflowobj["id"]})
+
+ mapper = ArvPathMapper(arvrunner, sc, "",
+ "keep:%s",
+ "keep:%s/%s",
+ name=name)
+
+ def setloc(p):
+ if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
+ p["location"] = mapper.mapper(p["location"]).resolved
+ adjustFileObjs(workflowobj, setloc)
+ adjustDirObjs(workflowobj, setloc)
+
+ return mapper
+
+
+def upload_docker(arvrunner, tool):
+ if isinstance(tool, CommandLineTool):
+ (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+ if docker_req:
+ arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+ elif isinstance(tool, cwltool.workflow.Workflow):
+ for s in tool.steps:
+ upload_docker(arvrunner, s.embedded_tool)
+
class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse):
def update_pipeline_component(self, record):
pass
- def upload_docker(self, tool):
- if isinstance(tool, CommandLineTool):
- (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
- if docker_req:
- arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
- elif isinstance(tool, cwltool.workflow.Workflow):
- for s in tool.steps:
- self.upload_docker(s.embedded_tool)
-
-
def arvados_job_spec(self, *args, **kwargs):
- self.upload_docker(self.tool)
-
- workflowfiles = []
- jobfiles = []
- workflowfiles.append({"class":"File", "location": self.tool.tool["id"]})
+ upload_docker(self.arvrunner, self.tool)
self.name = os.path.basename(self.tool.tool["id"])
- def visitFiles(files, path):
- files.append(path)
+ workflowmapper = upload_dependencies(self.arvrunner,
+ self.name,
+ self.tool.doc_loader,
+ self.tool.tool,
+ self.tool.tool["id"],
+ True)
- document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
- loaded = set()
- def loadref(b, u):
- joined = urlparse.urljoin(b, u)
- if joined not in loaded:
- loaded.add(joined)
- return document_loader.fetch(urlparse.urljoin(b, u))
- else:
- return {}
-
- sc = scandeps(uri, workflowobj,
- set(("$import", "run")),
- set(("$include", "$schemas", "path", "location")),
- loadref)
- adjustFileObjs(sc, partial(visitFiles, workflowfiles))
- adjustFileObjs(self.job_order, partial(visitFiles, jobfiles))
-
- keepprefix = kwargs.get("keepprefix", "")
- workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
- keepprefix+"%s",
- keepprefix+"%s/%s",
- name=self.name,
- **kwargs)
-
- jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
- keepprefix+"%s",
- keepprefix+"%s/%s",
- name=os.path.basename(self.job_order.get("id", "#")),
- **kwargs)
-
- def setloc(p):
- p["location"] = jobmapper.mapper(p["location"])[1]
- adjustFileObjs(self.job_order, setloc)
+ jobmapper = upload_dependencies(self.arvrunner,
+ os.path.basename(self.job_order.get("id", "#")),
+ self.tool.doc_loader,
+ self.job_order,
+ self.job_order.get("id", "#"),
+ False)
+
+ adjustDirObjs(self.job_order, trim_listing)
if "id" in self.job_order:
del self.job_order["id"]
outc = arvados.collection.Collection(record["output"])
with outc.open("cwl.output.json") as f:
outputs = json.load(f)
- def keepify(path):
+ def keepify(fileobj):
+ path = fileobj["location"]
if not path.startswith("keep:"):
- return "keep:%s/%s" % (record["output"], path)
- else:
- return path
- adjustFiles(outputs, keepify)
+ fileobj["location"] = "keep:%s/%s" % (record["output"], path)
+ adjustFileObjs(outputs, keepify)
+ adjustDirObjs(outputs, keepify)
except Exception as e:
logger.error("While getting final output object: %s", e)
self.arvrunner.output_callback(outputs, processStatus)