import re
import subprocess
-from cStringIO import StringIO
+from StringIO import StringIO
from schema_salad.sourceline import SourceLine
if include_primary and "id" in workflowobj:
sc.append({"class": "File", "location": workflowobj["id"]})
+ if "$schemas" in workflowobj:
+ for s in workflowobj["$schemas"]:
+ sc.append({"class": "File", "location": s})
+
mapper = ArvPathMapper(arvrunner, sc, "",
"keep:%s",
"keep:%s/%s",
adjustFileObjs(workflowobj, setloc)
adjustDirObjs(workflowobj, setloc)
+ if "$schemas" in workflowobj:
+ sch = []
+ for s in workflowobj["$schemas"]:
+ sch.append(mapper.mapper(s).resolved)
+ workflowobj["$schemas"] = sch
+
return mapper
def upload_docker(arvrunner, tool):
- """Visitor which uploads Docker images referenced in CommandLineTool objects."""
+ """Uploads Docker images used in CommandLineTool objects."""
+
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
if docker_req:
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+ elif isinstance(tool, cwltool.workflow.Workflow):
+ for s in tool.steps:
+ upload_docker(arvrunner, s.embedded_tool)
def packed_workflow(arvrunner, tool):
"""Create a packed workflow.
def upload_workflow_deps(arvrunner, tool):
# Ensure that Docker images needed by this workflow are available
- tool.visit(partial(upload_docker, arvrunner))
+
+ upload_docker(arvrunner, tool)
document_loader = tool.doc_loader
raise Exception("Docker image %s is not available\n%s" % (img, e) )
return img
+def upload_workflow_collection(arvrunner, name, packed):
+ collection = arvados.collection.Collection(api_client=arvrunner.api,
+ keep_client=arvrunner.keep_client,
+ num_retries=arvrunner.num_retries)
+ with collection.open("workflow.cwl", "w") as f:
+ f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
+
+ filters = [["portable_data_hash", "=", collection.portable_data_hash()],
+ ["name", "like", name+"%"]]
+ if arvrunner.project_uuid:
+ filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+ exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
+
+ if exists["items"]:
+ logger.info("Using collection %s", exists["items"][0]["uuid"])
+ else:
+ collection.save_new(name=name,
+ owner_uuid=arvrunner.project_uuid,
+ ensure_unique_name=True,
+ num_retries=arvrunner.num_retries)
+ logger.info("Uploaded to %s", collection.manifest_locator())
+
+ return collection.portable_data_hash()
+
+
class Runner(object):
"""Base class for runner processes, which submit an instance of
arvados-cwl-runner and wait for the final result."""