from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_instance
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
for p in proc_states["items"]:
self.on_message({
- "object_uuid": p["uuid"],
+ "object_uuid": p["uuid"],
"event_type": "update",
"properties": {
"new_attributes": p
keep_client=self.keep_client)
self.fs_access = make_fs_access(kwargs["basedir"])
+ if not kwargs.get("name"):
+ kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+
+ # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
+ # Also uploads docker images.
+ upload_workflow_deps(self, tool)
+
+ # Reload tool object which may have been updated by
+ # upload_workflow_deps
+ tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
+ makeTool=self.arv_make_tool,
+ loader=tool.doc_loader,
+ avsc_names=tool.doc_schema,
+ metadata=tool.metadata)
+
+ # Upload local file references in the job order.
+ job_order = upload_job_order(self, "%s input" % kwargs["name"],
+ tool, job_order)
+
existing_uuid = kwargs.get("update_workflow")
if existing_uuid or kwargs.get("create_workflow"):
+ # Create a pipeline template or workflow record and exit.
if self.work_api == "jobs":
tmpl = RunnerTemplate(self, tool, job_order,
kwargs.get("enable_reuse"),
uuid=existing_uuid,
submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"))
+ name=kwargs["name"])
tmpl.save()
# cwltool.main will write our return value to stdout.
return (tmpl.uuid, "success")
- else:
+ elif self.work_api == "containers":
return (upload_workflow(self, tool, job_order,
- self.project_uuid,
- uuid=existing_uuid,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name")), "success")
+ self.project_uuid,
+ uuid=existing_uuid,
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs["name"]),
+ "success")
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
kwargs["tmpdir_prefix"] = "tmp"
kwargs["compute_checksum"] = kwargs.get("compute_checksum")
- if not kwargs["name"]:
- del kwargs["name"]
-
if self.work_api == "containers":
kwargs["outdir"] = "/var/spool/cwl"
kwargs["docker_outdir"] = "/var/spool/cwl"
kwargs["docker_outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
- upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
-
runnerjob = None
if kwargs.get("submit"):
+ # Submit a runner job to run the workflow for us.
if self.work_api == "containers":
if tool.tool["class"] == "CommandLineTool":
kwargs["runnerjob"] = tool.tool["id"]
+ upload_dependencies(self,
+ kwargs["name"],
+ tool.doc_loader,
+ tool.tool,
+ tool.tool["id"],
+ False)
runnerjob = tool.job(job_order,
self.output_callback,
**kwargs).next()
on_error=kwargs.get("on_error"),
submit_runner_image=kwargs.get("submit_runner_image"))
- if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
+ if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
# Create pipeline for local run
self.pipeline = self.api.pipeline_instances().create(
body={
from cwltool.errors import WorkflowException
from cwltool.process import get_feature, UnsupportedRequirement, shortname
-from cwltool.pathmapper import adjustFiles
+from cwltool.pathmapper import adjustFiles, adjustDirObjs
from cwltool.utils import aslist
import arvados.collection
from .arvdocker import arv_docker_get_image
from . import done
-from .runner import Runner, arvados_jobs_image
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
from .fsaccess import CollectionFetcher
logger = logging.getLogger('arvados.cwl-runner')
the +body+ argument to container_requests().create().
"""
- workflowmapper = super(RunnerContainer, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+ packed = packed_workflow(self.arvrunner, self.tool)
+
+ adjustDirObjs(self.job_order, trim_listing)
container_req = {
"owner_uuid": self.arvrunner.project_uuid,
"properties": {}
}
- workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
- if workflowcollection.startswith("keep:"):
- workflowcollection = workflowcollection[5:workflowcollection.index('/')]
- workflowname = os.path.basename(self.tool.tool["id"])
- workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
- container_req["mounts"]["/var/lib/cwl/workflow"] = {
- "kind": "collection",
- "portable_data_hash": "%s" % workflowcollection
- }
- elif workflowcollection.startswith("arvwf:"):
- workflowpath = "/var/lib/cwl/workflow.json#main"
- wfuuid = workflowcollection[6:workflowcollection.index("#")]
- wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
- wfobj = yaml.safe_load(wfrecord["definition"])
- if container_req["name"].startswith("arvwf:"):
- container_req["name"] = wfrecord["name"]
- container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
- "kind": "json",
- "json": wfobj
- }
- container_req["properties"]["template_uuid"] = wfuuid
+ workflowpath = "/var/lib/cwl/workflow.json#main"
+ container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+ "kind": "json",
+ "json": packed
+ }
+ if self.tool.tool.get("id", "").startswith("arvwf:"):
+ container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
if self.output_name:
from cwltool.draft2tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
+from cwltool.pathmapper import adjustDirObjs
+
+import ruamel.yaml as yaml
import arvados.collection
from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
from .pathmapper import InitialWorkDirPathMapper
from .perf import Perf
from . import done
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+ def upload_workflow_collection(self, packed):
+ collection = arvados.collection.Collection(api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+ with collection.open("workflow.cwl", "w") as f:
+ f.write(yaml.round_trip_dump(packed))
+
+ exists = self.arvrunner.api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+ ["portable_data_hash", "=", collection.portable_data_hash()],
+ ["name", "like", self.name+"%"]]).execute(num_retries=self.arvrunner.num_retries)
+
+ if exists["items"]:
+ logger.info("Using collection %s", exists["items"][0]["uuid"])
+ else:
+ collection.save_new(name=self.name,
+ owner_uuid=self.arvrunner.project_uuid,
+ ensure_unique_name=True,
+ num_retries=self.arvrunner.num_retries)
+ logger.info("Uploaded to %s", collection.manifest_locator())
+
+ return collection.portable_data_hash()
+
def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
"""Create an Arvados job specification for this workflow.
a pipeline template or pipeline instance.
"""
- workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+ if self.tool.tool["id"].startswith("keep:"):
+ pass
+ else:
+ packed = packed_workflow(self.arvrunner, self.tool)
+ wf_pdh = self.upload_workflow_collection(packed)
+ self.job_order["cwl:tool"] = "%s/workflow.cwl" % wf_pdh
- self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
+ adjustDirObjs(self.job_order, trim_listing)
if self.output_name:
self.job_order["arv:output_name"] = self.output_name
import ruamel.yaml as yaml
-from .runner import upload_docker, upload_dependencies, trim_listing
+from .runner import upload_dependencies, trim_listing, packed_workflow
from .arvtool import ArvadosCommandTool
from .perf import Perf
def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
submit_runner_ram=0, name=None):
- upload_docker(arvRunner, tool)
- document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
-
- packed = pack(document_loader, workflowobj, uri, tool.metadata)
+ packed = packed_workflow(arvRunner, tool)
adjustDirObjs(job_order, trim_listing)
if not name:
name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
- upload_dependencies(arvRunner, name, document_loader,
- packed, uri, False)
+ upload_dependencies(arvRunner, name, tool.doc_loader,
+ packed, tool.tool["id"], False)
# TODO nowhere for submit_runner_ram to go.
with self.fsaccess.open(url, "r") as f:
return f.read()
if url.startswith("arvwf:"):
- return self.api_client.workflows().get(uuid=url[6:]).execute()["definition"]
+ record = self.api_client.workflows().get(uuid=url[6:]).execute()
+ definition = record["definition"] + ('\nlabel: "%s"\n' % record["name"].replace('"', '\\"'))
+ return definition
return super(CollectionFetcher, self).fetch_text(url)
def check_exists(self, url):
import logging
import json
import re
+import subprocess
+
from cStringIO import StringIO
from schema_salad.sourceline import SourceLine
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
from cwltool.utils import aslist
from cwltool.builder import substitute
+from cwltool.pack import pack
import arvados.collection
import ruamel.yaml as yaml
del obj["location"]
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, loadref_run):
+ workflowobj, uri, loadref_run, include_primary=True):
"""Upload the dependencies of the workflowobj document to Keep.
Returns a pathmapper object mapping local paths to keep references. Also
normalizeFilesDirs(sc)
- if "id" in workflowobj:
+ if include_primary and "id" in workflowobj:
sc.append({"class": "File", "location": workflowobj["id"]})
mapper = ArvPathMapper(arvrunner, sc, "",
def upload_docker(arvrunner, tool):
+ """Visitor which uploads Docker images referenced in CommandLineTool objects."""
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
if docker_req:
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
- elif isinstance(tool, cwltool.workflow.Workflow):
- for s in tool.steps:
- upload_docker(arvrunner, s.embedded_tool)
-
-def upload_instance(arvrunner, name, tool, job_order):
- upload_docker(arvrunner, tool)
-
- for t in tool.tool["inputs"]:
- def setSecondary(fileobj):
- if isinstance(fileobj, dict) and fileobj.get("class") == "File":
- if "secondaryFiles" not in fileobj:
- fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
-
- if isinstance(fileobj, list):
- for e in fileobj:
- setSecondary(e)
-
- if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
- setSecondary(job_order[shortname(t["id"])])
-
- workflowmapper = upload_dependencies(arvrunner,
- name,
- tool.doc_loader,
- tool.tool,
- tool.tool["id"],
- True)
- jobmapper = upload_dependencies(arvrunner,
- os.path.basename(job_order.get("id", "#")),
- tool.doc_loader,
- job_order,
- job_order.get("id", "#"),
- False)
-
- if "id" in job_order:
- del job_order["id"]
-
- return workflowmapper
+
+def packed_workflow(arvrunner, tool):
+ """Create a packed workflow.
+
+ A "packed" workflow is one where all the components have been combined into a single document."""
+
+ return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
+ tool.tool["id"], tool.metadata)
+
+def tag_git_version(packed):
+ if tool.tool["id"].startswith("file://"):
+ path = os.path.dirname(tool.tool["id"][7:])
+ try:
+ githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
+ except (OSError, subprocess.CalledProcessError):
+ pass
+ else:
+ packed["http://schema.org/version"] = githash
+
+
+def upload_job_order(arvrunner, name, tool, job_order):
+ """Upload local files referenced in the input object and return updated input
+ object with 'location' updated to the proper keep references.
+ """
+
+ for t in tool.tool["inputs"]:
+ def setSecondary(fileobj):
+ if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+ if "secondaryFiles" not in fileobj:
+ fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+
+ if isinstance(fileobj, list):
+ for e in fileobj:
+ setSecondary(e)
+
+ if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+ setSecondary(job_order[shortname(t["id"])])
+
+ jobmapper = upload_dependencies(arvrunner,
+ name,
+ tool.doc_loader,
+ job_order,
+ job_order.get("id", "#"),
+ False)
+
+ if "id" in job_order:
+ del job_order["id"]
+
+ # Need to filter this out, gets added by cwltool when providing
+ # parameters on the command line.
+ if "job_order" in job_order:
+ del job_order["job_order"]
+
+ return job_order
+
+def upload_workflow_deps(arvrunner, tool):
+ # Ensure that Docker images needed by this workflow are available
+ tool.visit(partial(upload_docker, arvrunner))
+
+ document_loader = tool.doc_loader
+
+ def upload_tool_deps(deptool):
+ upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ include_primary=False)
+ document_loader.idx[deptool["id"]] = deptool
+
+ tool.visit(upload_tool_deps)
def arvados_jobs_image(arvrunner, img):
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
return img
class Runner(object):
+ """Base class for runner processes, which submit an instance of
+ arvados-cwl-runner and wait for the final result."""
+
def __init__(self, runner, tool, job_order, enable_reuse,
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None, submit_runner_image=None):
def update_pipeline_component(self, record):
pass
- def arvados_job_spec(self, *args, **kwargs):
- if self.name is None:
- self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
-
- # Need to filter this out, gets added by cwltool when providing
- # parameters on the command line.
- if "job_order" in self.job_order:
- del self.job_order["job_order"]
-
- workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
- adjustDirObjs(self.job_order, trim_listing)
- return workflowmapper
-
def done(self, record):
+ """Base method for handling a completed runner."""
+
try:
if record["state"] == "Complete":
if record.get("exit_code") is not None:
from .matcher import JsonDiffMatcher
from .mock_discovery import get_rootDesc
+import ruamel.yaml as yaml
+
_rootDesc = None
def stubs(func):
'script_parameters': {
'x': {
'basename': 'blorp.txt',
- 'location': 'keep:99999999999999999999999999999994+99/blorp.txt',
+ 'location': 'keep:99999999999999999999999999999992+99/blorp.txt',
'class': 'File'
},
'y': {
'class': 'Directory'
},
'cwl:tool':
- '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+ 'ef2c299cb4e5a565a46d94887eafdc62+58/workflow.cwl'
},
'repository': 'arvados',
'script_version': 'master',
'runtime_constraints': {'docker_image': 'arvados/jobs:'+arvados_cwl.__version__, 'min_ram_mb_per_node': 1024},
'script_parameters': {
'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
- 'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999994+99/blorp.txt'}},
+ 'x': {"value": {'basename': 'blorp.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999992+99/blorp.txt'}},
'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}},
- 'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl',
+ 'cwl:tool': 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl',
'arv:enable_reuse': True,
'arv:on_error': 'continue'
},
stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
+ with open("tests/wf/submit_wf_packed.cwl") as f:
+ expect_packed_workflow = yaml.round_trip_load(f)
+
stubs.expect_container_spec = {
'priority': 1,
'mounts': {
'writable': True,
'kind': 'collection'
},
- '/var/lib/cwl/workflow': {
- 'portable_data_hash': '99999999999999999999999999999991+99',
- 'kind': 'collection'
+ '/var/lib/cwl/workflow.json': {
+ 'json': expect_packed_workflow,
+ 'kind': 'json'
},
'stdout': {
'path': '/var/spool/cwl/cwl.output.json',
'kind': 'json',
'content': {
'y': {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'},
- 'x': {'basename': u'blorp.txt', 'class': 'File', 'location': u'keep:99999999999999999999999999999994+99/blorp.txt'},
+ 'x': {'basename': u'blorp.txt', 'class': 'File', 'location': u'keep:99999999999999999999999999999992+99/blorp.txt'},
'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}
'owner_uuid': None,
'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json'],
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'name': 'submit_wf.cwl',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
'output_path': '/var/spool/cwl',
stubs.api.collections().create.assert_has_calls([
mock.call(),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
- './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
- '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
- 'cc2ffb940e60adf1b2b282c67587e43d+413 0:413:submit_wf.cwl\n',
+ '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'owner_uuid': None,
- 'name': 'submit_wf.cwl',
- }, ensure_unique_name=True),
+ 'name': 'submit_tool.cwl dependencies',
+ }), ensure_unique_name=True),
mock.call().execute(),
- mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
- '0:0:blub.txt 0:0:submit_tool.cwl\n',
- 'replication_desired': None,
- 'name': 'New collection'
- }, ensure_unique_name=True),
- mock.call().execute(num_retries=4),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'owner_uuid': None,
- 'name': '#',
- }, ensure_unique_name=True),
+ 'name': 'submit_wf.cwl input',
+ }), ensure_unique_name=True),
mock.call().execute()])
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
stubs.api.pipeline_instances().create.assert_called_with(
- body=expect_pipeline)
+ body=JsonDiffMatcher(expect_pipeline))
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_pipeline_uuid + '\n')
stubs.api.collections().create.assert_has_calls([
mock.call(),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
- './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
- '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
- 'cc2ffb940e60adf1b2b282c67587e43d+413 0:413:submit_wf.cwl\n',
+ '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'owner_uuid': None,
- 'name': 'submit_wf.cwl',
- }, ensure_unique_name=True),
+ 'name': 'submit_tool.cwl dependencies',
+ }), ensure_unique_name=True),
mock.call().execute(),
- mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
- '0:0:blub.txt 0:0:submit_tool.cwl\n',
- 'name': 'New collection',
- 'replication_desired': None,
- }, ensure_unique_name=True),
- mock.call().execute(num_retries=4),
- mock.call(body={
+ mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'owner_uuid': None,
- 'name': '#',
- }, ensure_unique_name=True),
+ 'name': 'submit_wf.cwl input',
+ }), ensure_unique_name=True),
mock.call().execute()])
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--disable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=stop',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
"--output-name="+output_name, '--enable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.expect_container_spec["output_name"] = output_name
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
"--output-tags="+output_tags, '--enable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
'json': {
'cwlVersion': 'v1.0',
'$graph': [
+ {
+ 'id': '#main',
+ 'inputs': [
+ {'type': 'string', 'id': '#main/x'}
+ ],
+ 'steps': [
+ {'in': [{'source': '#main/x', 'id': '#main/step1/x'}],
+ 'run': '#submit_tool.cwl',
+ 'id': '#main/step1',
+ 'out': []}
+ ],
+ 'class': 'Workflow',
+ 'outputs': []
+ },
{
'inputs': [
{
'outputs': [],
'baseCommand': 'cat',
'class': 'CommandLineTool'
- }, {
- 'id': '#main',
- 'inputs': [
- {'type': 'string', 'id': '#main/x'}
- ],
- 'steps': [
- {'in': [{'source': '#main/x', 'id': '#main/step1/x'}],
- 'run': '#submit_tool.cwl',
- 'id': '#main/step1',
- 'out': []}
- ],
- 'class': 'Workflow',
- 'outputs': []
}
]
}
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
- body=expect_container)
+ body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
'dataclass': 'File',
'required': True,
'type': 'File',
- 'value': '99999999999999999999999999999994+99/blorp.txt',
+ 'value': '99999999999999999999999999999992+99/blorp.txt',
}
expect_component['script_parameters']['y'] = {
'dataclass': 'Collection',
expect_template = copy.deepcopy(self.expect_template)
params = expect_template[
"components"]["inputs_test.cwl"]["script_parameters"]
- params["fileInput"]["value"] = '99999999999999999999999999999994+99/blorp.txt'
+ params["fileInput"]["value"] = '99999999999999999999999999999992+99/blorp.txt'
params["floatInput"]["value"] = 1.234
params["boolInput"]["value"] = True
+cwlVersion: v1.0
$graph:
-- baseCommand: cat
- class: CommandLineTool
- id: '#submit_tool.cwl'
- inputs:
- - id: '#submit_tool.cwl/x'
- inputBinding: {position: 1}
- type: string
- outputs: []
- requirements:
- - {class: DockerRequirement, dockerPull: 'debian:8'}
- class: Workflow
id: '#main'
inputs:
- {id: '#main/step1/x', source: '#main/x'}
out: []
run: '#submit_tool.cwl'
-cwlVersion: v1.0
+- baseCommand: cat
+ class: CommandLineTool
+ id: '#submit_tool.cwl'
+ inputs:
+ - id: '#submit_tool.cwl/x'
+ inputBinding: {position: 1}
+ type: string
+ outputs: []
+ requirements:
+ - {class: DockerRequirement, dockerPull: 'debian:8'}
type: File
default:
class: File
- location: keep:99999999999999999999999999999991+99/tool/blub.txt
+ location: keep:99999999999999999999999999999991+99/blub.txt
inputBinding:
position: 1
outputs: []
inputs:
- id: '#main/x'
type: File
- default: {class: File, location: 'keep:99999999999999999999999999999991+99/input/blorp.txt',
+ default: {class: File, location: 'keep:99999999999999999999999999999992+99/blorp.txt',
basename: blorp.txt}
- id: '#main/y'
type: Directory
--- /dev/null
+cwlVersion: v1.0
+$graph:
+- class: CommandLineTool
+ requirements:
+ - class: DockerRequirement
+ dockerPull: debian:8
+ inputs:
+ - id: '#submit_tool.cwl/x'
+ type: File
+ default:
+ class: File
+ location: keep:99999999999999999999999999999991+99/blub.txt
+ inputBinding:
+ position: 1
+ outputs: []
+ baseCommand: cat
+ id: '#submit_tool.cwl'
+- class: Workflow
+ inputs:
+ - id: '#main/x'
+ type: File
+ - id: '#main/y'
+ type: Directory
+ - id: '#main/z'
+ type: Directory
+ outputs: []
+ steps:
+ - id: '#main/step1'
+ in:
+ - {id: '#main/step1/x', source: '#main/x'}
+ out: []
+ run: '#submit_tool.cwl'
+ id: '#main'