input_obj = @updates[:mounts].andand[:"/var/lib/cwl/cwl.input.json"].andand[:content]
if input_obj
workflow = @object.mounts[:"/var/lib/cwl/workflow.json"][:content]
- workflow[:inputs].each do |input_schema|
- if not input_obj.include? input_schema[:id]
+ get_cwl_inputs(workflow).each do |input_schema|
+ if not input_obj.include? cwl_shortname(input_schema[:id])
next
end
required, primary_type, param_id = cwl_input_info(input_schema)
attrs['state'] = "Uncommitted"
# required
- attrs['command'] = ["arvados-cwl-runner", "--local", "--api=containers", "/var/lib/cwl/workflow.json", "/var/lib/cwl/cwl.input.json"]
+ attrs['command'] = ["arvados-cwl-runner", "--local", "--api=containers", "/var/lib/cwl/workflow.json#main", "/var/lib/cwl/cwl.input.json"]
attrs['container_image'] = "arvados/jobs"
attrs['cwd'] = "/var/spool/cwl"
attrs['output_path'] = "/var/spool/cwl"
lt
end
+ def get_cwl_inputs(workflow)
+ if workflow[:inputs]
+ return workflow[:inputs]
+ else
+ workflow[:"$graph"].each do |tool|
+ if tool[:id] == "#main"
+ return tool[:inputs]
+ end
+ end
+ end
+ end
+
+ def cwl_shortname(id)
+ if id[0] == "#"
+ id = id[1..-1]
+ end
+ return id.split("/")[-1]
+ end
+
def cwl_input_info(input_schema)
required = !(input_schema[:type].include? "null")
if input_schema[:type].is_a? Array
elsif input_schema[:type].is_a? Hash
primary_type = input_schema[:type]
end
- param_id = input_schema[:id]
+ param_id = cwl_shortname(input_schema[:id])
return required, primary_type, param_id
end
"data-type" => "select",
"data-source" => (opt_empty_selection + [{value: "true", text: "true"}, {value: "false", text: "false"}]).to_json,
"data-url" => url_for(action: "update", id: object.uuid, controller: object.class.to_s.pluralize.underscore, merge: true),
- "data-title" => "Set value for #{input_schema[:id]}",
+ "data-title" => "Set value for #{cwl_shortname(input_schema[:id])}",
"data-name" => dn,
"data-pk" => "{id: \"#{object.uuid}\", key: \"#{object.class.to_s.underscore}\"}",
"data-value" => attrvalue.to_s,
"data-type" => "select",
"data-source" => (opt_empty_selection + primary_type[:symbols].map {|i| {:value => i, :text => i} }).to_json,
"data-url" => url_for(action: "update", id: object.uuid, controller: object.class.to_s.pluralize.underscore, merge: true),
- "data-title" => "Set value for #{input_schema[:id]}",
+ "data-title" => "Set value for #{cwl_shortname(input_schema[:id])}",
"data-name" => dn,
"data-pk" => "{id: \"#{object.uuid}\", key: \"#{object.class.to_s.underscore}\"}",
"data-value" => attrvalue,
"data-placement" => "bottom",
"data-type" => datatype,
"data-url" => url_for(action: "update", id: object.uuid, controller: object.class.to_s.pluralize.underscore, merge: true),
- "data-title" => "Set value for #{input_schema[:id]}",
+ "data-title" => "Set value for #{cwl_shortname(input_schema[:id])}",
"data-name" => dn,
"data-pk" => "{id: \"#{object.uuid}\", key: \"#{object.class.to_s.underscore}\"}",
"data-value" => attrvalue,
-<% n_inputs = cwl_inputs_required(@object, @object.mounts[:"/var/lib/cwl/workflow.json"][:content][:inputs], [:mounts, :"/var/lib/cwl/cwl.input.json", :content]) %>
+<% n_inputs = cwl_inputs_required(@object, get_cwl_inputs(@object.mounts[:"/var/lib/cwl/workflow.json"][:content]), [:mounts, :"/var/lib/cwl/cwl.input.json", :content]) %>
<% content_for :pi_input_form do %>
<form role="form" style="width:60%">
<div class="form-group">
<% workflow = @object.mounts[:"/var/lib/cwl/workflow.json"][:content] %>
- <% workflow[:inputs].each do |input| %>
- <label for="#input-<%= input[:id] %>">
- <%= input[:label] || input[:id] %>
+ <% inputs = get_cwl_inputs(workflow) %>
+ <% inputs.each do |input| %>
+ <label for="#input-<%= cwl_shortname(input[:id]) %>">
+ <%= input[:label] || cwl_shortname(input[:id]) %>
</label>
<div>
<p class="form-control-static">
# So we build this thing separately.
#
# Ward, 2016-03-17
-fpm_build schema_salad "" "" python 1.16.20160810195039
+fpm_build schema_salad "" "" python 1.17.20160820171034
# And schema_salad now depends on ruamel-yaml, which apparently has a braindead setup.py that requires special arguments to build (otherwise, it aborts with 'error: you have to install with "pip install ."'). Sigh.
# Ward, 2016-05-26
# ...and schema_salad 1.12.20160610104117 doesn't work with ruamel-yaml > 0.11.11.
-fpm_build ruamel.yaml "" "" python 0.11.11 --python-setup-py-arguments "--single-version-externally-managed"
+fpm_build ruamel.yaml "" "" python 0.12.4 --python-setup-py-arguments "--single-version-externally-managed"
# And for cwltool we have the same problem as for schema_salad. Ward, 2016-03-17
-fpm_build cwltool "" "" python 1.0.20160811184335
+fpm_build cwltool "" "" python 1.0.20160829211335
# FPM eats the trailing .0 in the python-rdflib-jsonld package when built with 'rdflib-jsonld>=0.3.0'. Force the version. Ward, 2016-03-25
fpm_build rdflib-jsonld "" "" python 0.3.0
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
from .arvtool import ArvadosCommandTool
from .fsaccess import CollectionFsAccess
+from .arvworkflow import make_workflow
from cwltool.process import shortname, UnsupportedRequirement
from cwltool.pathmapper import adjustFileObjs
# cwltool.main will write our return value to stdout.
return tmpl.uuid
+ if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
+ return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+
self.debug = kwargs.get("debug")
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
default=True, dest="submit")
exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
+ exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
+ exgroup.add_argument("--update-workflow", type=str, help="Update Arvados workflow.")
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
job_order_object = None
arvargs = parser.parse_args(args)
- if arvargs.create_template and not arvargs.job_order:
+ if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
job_order_object = ({}, "")
try:
--- /dev/null
+import os
+import json
+import copy
+
+from cwltool.pack import pack
+from cwltool.load_tool import fetch_document
+from cwltool.process import shortname
+
+from .runner import upload_docker, upload_dependencies
+
+def make_workflow(arvRunner, tool, job_order, project_uuid, update_uuid):
+ upload_docker(arvRunner, tool)
+
+ document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
+
+ packed = pack(document_loader, workflowobj, uri, tool.metadata)
+
+ main = [p for p in packed["$graph"] if p["id"] == "#main"][0]
+ for inp in main["inputs"]:
+ sn = shortname(inp["id"])
+ if sn in job_order:
+ inp["default"] = job_order[sn]
+
+ name = os.path.basename(tool.tool["id"])
+ upload_dependencies(arvRunner, name, document_loader,
+ packed, uri, "keep:", False)
+
+ body = {
+ "workflow": {
+ "owner_uuid": project_uuid,
+ "name": tool.tool.get("label", name),
+ "description": tool.tool.get("doc", ""),
+ "workflow":json.dumps(packed, sort_keys=True, indent=4, separators=(',',': '))
+ }}
+
+ if update_uuid:
+ return arvRunner.api.workflows().update(uuid=update_uuid, body=body).execute(num_retries=arvRunner.num_retries)["uuid"]
+ else:
+ return arvRunner.api.workflows().create(body=body).execute(num_retries=arvRunner.num_retries)["uuid"]
cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+def upload_dependencies(arvrunner, name, document_loader,
+ workflowobj, uri, keepprefix, loadref_run):
+ loaded = set()
+ def loadref(b, u):
+ joined = urlparse.urljoin(b, u)
+ if joined not in loaded:
+ loaded.add(joined)
+ return document_loader.fetch(urlparse.urljoin(b, u))
+ else:
+ return {}
+
+ if loadref_run:
+ loadref_fields = set(("$import", "run"))
+ else:
+ loadref_fields = set(("$import",))
+
+ sc = scandeps(uri, workflowobj,
+ loadref_fields,
+ set(("$include", "$schemas", "path", "location")),
+ loadref)
+
+ files = []
+ def visitFiles(path):
+ files.append(path)
+
+ adjustFileObjs(sc, visitFiles)
+ adjustDirObjs(sc, visitFiles)
+
+ normalizeFilesDirs(files)
+
+ if "id" in workflowobj:
+ files.append({"class": "File", "location": workflowobj["id"]})
+
+ mapper = ArvPathMapper(arvrunner, files, "",
+ keepprefix+"%s",
+ keepprefix+"%s/%s",
+ name=name)
+
+ def setloc(p):
+ p["location"] = mapper.mapper(p["location"]).target
+ adjustFileObjs(workflowobj, setloc)
+ adjustDirObjs(workflowobj, setloc)
+
+ return mapper
+
+
+def upload_docker(arvrunner, tool):
+ if isinstance(tool, CommandLineTool):
+ (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+ if docker_req:
+ arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+ elif isinstance(tool, cwltool.workflow.Workflow):
+ for s in tool.steps:
+ upload_docker(arvrunner, s.embedded_tool)
+
+
class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse):
self.arvrunner = runner
def update_pipeline_component(self, record):
pass
- def upload_docker(self, tool):
- if isinstance(tool, CommandLineTool):
- (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
- if docker_req:
- arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
- elif isinstance(tool, cwltool.workflow.Workflow):
- for s in tool.steps:
- self.upload_docker(s.embedded_tool)
-
-
def arvados_job_spec(self, *args, **kwargs):
- self.upload_docker(self.tool)
-
- workflowfiles = []
- jobfiles = []
- workflowfiles.append({"class":"File", "location": self.tool.tool["id"]})
+ upload_docker(self.arvrunner, self.tool)
self.name = os.path.basename(self.tool.tool["id"])
- def visitFiles(files, path):
- files.append(path)
-
- document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
- loaded = set()
- def loadref(b, u):
- joined = urlparse.urljoin(b, u)
- if joined not in loaded:
- loaded.add(joined)
- return document_loader.fetch(urlparse.urljoin(b, u))
- else:
- return {}
-
- sc = scandeps(uri, workflowobj,
- set(("$import", "run")),
- set(("$include", "$schemas", "path", "location")),
- loadref)
- adjustFileObjs(sc, partial(visitFiles, workflowfiles))
- adjustFileObjs(self.job_order, partial(visitFiles, jobfiles))
- adjustDirObjs(sc, partial(visitFiles, workflowfiles))
- adjustDirObjs(self.job_order, partial(visitFiles, jobfiles))
-
- normalizeFilesDirs(jobfiles)
- normalizeFilesDirs(workflowfiles)
-
- keepprefix = kwargs.get("keepprefix", "")
- workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
- keepprefix+"%s",
- keepprefix+"%s/%s",
- name=self.name,
- **kwargs)
-
- jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
- keepprefix+"%s",
- keepprefix+"%s/%s",
- name=os.path.basename(self.job_order.get("id", "#")),
- **kwargs)
-
- def setloc(p):
- p["location"] = jobmapper.mapper(p["location"])[1]
- adjustFileObjs(self.job_order, setloc)
- adjustDirObjs(self.job_order, setloc)
+ workflowmapper = upload_dependencies(self.arvrunner,
+ self.name,
+ self.tool.doc_loader,
+ self.tool.tool,
+ self.tool.tool["id"],
+ kwargs.get("keepprefix", ""),
+ True)
+
+ jobmapper = upload_dependencies(self.arvrunner,
+ os.path.basename(self.job_order.get("id", "#")),
+ self.tool.doc_loader,
+ self.job_order,
+ self.job_order.get("id", "#"),
+ kwargs.get("keepprefix", ""),
+ False)
if "id" in self.job_order:
del self.job_order["id"]
# Make sure to update arvados/build/run-build-packages.sh as well
# when updating the cwltool version pin.
install_requires=[
- 'cwltool==1.0.20160811184335',
+ 'cwltool==1.0.20160829211335',
'arvados-python-client>=0.1.20160714204738',
],
data_files=[
import os
import functools
import cwltool.process
+from schema_salad.ref_resolver import Loader
if not os.getenv('ARVADOS_DEBUG'):
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
}
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access)
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}))
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run",
make_fs_access=make_fs_access, tmpdir="/tmp"):
}
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
- avsc_names=avsc_names, make_fs_access=make_fs_access)
+ avsc_names=avsc_names, make_fs_access=make_fs_access,
+ loader=Loader({}))
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements",
make_fs_access=make_fs_access, tmpdir="/tmp"):
import os
import functools
import cwltool.process
+from schema_salad.ref_resolver import Loader
if not os.getenv('ARVADOS_DEBUG'):
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
"arguments": [{"valueFrom": "$(runtime.outdir)"}]
}
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names, basedir="", make_fs_access=make_fs_access)
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs",
+ avsc_names=avsc_names, basedir="",
+ make_fs_access=make_fs_access,
+ loader=Loader({}))
+
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
j.run()
"baseCommand": "ls"
}
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names, make_fs_access=make_fs_access)
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs",
+ avsc_names=avsc_names,
+ make_fs_access=make_fs_access,
+ loader=Loader({}))
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
j.run()
import mock
import sys
import unittest
+import json
from .matcher import JsonDiffMatcher
'ram': 268435456
}
}
+
+ stubs.expect_workflow_uuid = "zzzzz-7fd4e-zzzzzzzzzzzzzzz"
+ stubs.api.workflows().create().execute.return_value = {
+ "uuid": stubs.expect_workflow_uuid,
+ }
+
return func(self, stubs, *args, **kwargs)
return wrapped
mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
'0:0:blub.txt 0:0:submit_tool.cwl\n',
'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
- 'name': 'New collection',
- 'replication_desired': None,
+ 'name': 'New collection'
}, ensure_unique_name=True),
mock.call().execute(num_retries=4),
mock.call(body={
'0:0:blub.txt 0:0:submit_tool.cwl\n',
'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
'name': 'New collection',
- 'replication_desired': None,
}, ensure_unique_name=True),
mock.call().execute(num_retries=4),
mock.call(body={
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
- ["--create-template", "--no-wait", "--debug",
+ ["--create-template", "--debug",
"--project-uuid", project_uuid,
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
stubs.expect_pipeline_template_uuid + '\n')
+class TestCreateWorkflow(unittest.TestCase):
+ @stubs
+ def test_create(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+
+ capture_stdout = cStringIO.StringIO()
+
+ exited = arvados_cwl.main(
+ ["--create-workflow", "--debug",
+ "--project-uuid", project_uuid,
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ stubs.api.pipeline_templates().create.refute_called()
+ stubs.api.container_requests().create.refute_called()
+
+ with open("tests/wf/expect_packed.cwl") as f:
+ expect_workflow = f.read()
+
+ body = {
+ "workflow": {
+ "owner_uuid": project_uuid,
+ "name": "submit_wf.cwl",
+ "description": "",
+ "workflow": expect_workflow
+ }
+ }
+ stubs.api.workflows().create.assert_called_with(
+ body=JsonDiffMatcher(body))
+
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_workflow_uuid + '\n')
+
+
class TestTemplateInputs(unittest.TestCase):
expect_template = {
"components": {
--- /dev/null
+{
+ "$graph": [
+ {
+ "baseCommand": "cat",
+ "class": "CommandLineTool",
+ "id": "#submit_tool.cwl",
+ "inputs": [
+ {
+ "default": {
+ "class": "File",
+ "location": "keep:99999999999999999999999999999991+99/tool/blub.txt"
+ },
+ "id": "#submit_tool.cwl/x",
+ "inputBinding": {
+ "position": 1
+ },
+ "type": "File"
+ }
+ ],
+ "outputs": [],
+ "requirements": [
+ {
+ "class": "DockerRequirement",
+ "dockerImageId": "debian:8",
+ "dockerPull": "debian:8"
+ }
+ ]
+ },
+ {
+ "class": "Workflow",
+ "id": "#main",
+ "inputs": [
+ {
+ "default": {
+ "basename": "blorp.txt",
+ "class": "File",
+ "location": "keep:99999999999999999999999999999991+99/input/blorp.txt"
+ },
+ "id": "#main/x",
+ "type": "File"
+ }
+ ],
+ "outputs": [],
+ "steps": [
+ {
+ "id": "#main/step1",
+ "in": [
+ {
+ "id": "#main/step1/x",
+ "source": "#main/x"
+ }
+ ],
+ "out": [],
+ "run": "#submit_tool.cwl"
+ }
+ ]
+ }
+ ],
+ "cwlVersion": "v1.0"
+}
\ No newline at end of file