import argparse
import arvados
-import arvados.events
+import arvados.collection
import arvados.commands.keepdocker
import arvados.commands.run
-import arvados.collection
+import arvados.events
import arvados.util
+import copy
+import cwltool.docker
import cwltool.draft2tool
-import cwltool.workflow
+from cwltool.errors import WorkflowException
import cwltool.main
from cwltool.process import shortname
-from cwltool.errors import WorkflowException
-import threading
-import cwltool.docker
+import cwltool.workflow
import fnmatch
-import logging
-import re
-import os
-import sys
import functools
import json
+import logging
+import os
import pkg_resources # part of setuptools
+import re
+import sys
+import threading
from cwltool.process import get_feature, adjustFiles, scandeps
from arvados.api import OrderedJsonModel
for s in tool.steps:
self.upload_docker(s.embedded_tool)
- def run(self, dry_run=False, pull_image=True, **kwargs):
+ def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+ """Create an Arvados job specification for this workflow.
+
+ The returned dict can be used to create a job (i.e., passed as
+ the +body+ argument to jobs().create()), or as a component in
+ a pipeline template or pipeline instance.
+ """
self.upload_docker(self.tool)
workflowfiles = set()
del self.job_order["id"]
self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
-
- response = self.arvrunner.api.jobs().create(body={
- "owner_uuid": self.arvrunner.project_uuid,
+ return {
"script": "cwl-runner",
"script_version": "master",
"repository": "arvados",
"runtime_constraints": {
"docker_image": "arvados/jobs"
}
- }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
+ }
+
+ def run(self, *args, **kwargs):
+ job_spec = self.arvados_job_spec(*args, **kwargs)
+ job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
- self.arvrunner.jobs[response["uuid"]] = self
+ response = self.arvrunner.api.jobs().create(
+ body=job_spec,
+ find_or_create=self.enable_reuse
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ self.uuid = response["uuid"]
+ self.arvrunner.jobs[self.uuid] = self
logger.info("Submitted job %s", response["uuid"])
finally:
del self.arvrunner.jobs[record["uuid"]]
+
+class RunnerTemplate(object):
+ """An Arvados pipeline template that invokes a CWL workflow."""
+
+ type_to_dataclass = {
+ 'boolean': 'boolean',
+ 'File': 'File',
+ 'float': 'number',
+ 'int': 'number',
+ 'string': 'text',
+ }
+
+ def __init__(self, runner, tool, job_order, enable_reuse):
+ self.runner = runner
+ self.tool = tool
+ self.job = RunnerJob(
+ runner=runner,
+ tool=tool,
+ job_order=job_order,
+ enable_reuse=enable_reuse)
+
+ def pipeline_component_spec(self):
+ """Return a component that Workbench and a-r-p-i will understand.
+
+ Specifically, translate CWL input specs to Arvados pipeline
+ format, like {"dataclass":"File","value":"xyz"}.
+ """
+ spec = self.job.arvados_job_spec()
+
+ # Most of the component spec is exactly the same as the job
+ # spec (script, script_version, etc.).
+ # spec['script_parameters'] isn't right, though. A component
+ # spec's script_parameters hash is a translation of
+ # self.tool.tool['inputs'] with defaults/overrides taken from
+ # the job order. So we move the job parameters out of the way
+ # and build a new spec['script_parameters'].
+ job_params = spec['script_parameters']
+ spec['script_parameters'] = {}
+
+ for param in self.tool.tool['inputs']:
+ param = copy.deepcopy(param)
+
+ # Data type and "required" flag...
+ types = param['type']
+ if not isinstance(types, list):
+ types = [types]
+ param['required'] = 'null' not in types
+ non_null_types = set(types) - set(['null'])
+ if len(non_null_types) == 1:
+ the_type = [c for c in non_null_types][0]
+ dataclass = self.type_to_dataclass.get(the_type)
+ if dataclass:
+ param['dataclass'] = dataclass
+ # Note: If we didn't figure out a single appropriate
+ # dataclass, we just left that attribute out. We leave
+ # the "type" attribute there in any case, which might help
+ # downstream.
+
+ # Title and description...
+ title = param.pop('label', '')
+ descr = param.pop('description', '').rstrip('\n')
+ if title:
+ param['title'] = title
+ if descr:
+ param['description'] = descr
+
+ # Fill in the value from the current job order, if any.
+ param_id = shortname(param.pop('id'))
+ value = job_params.get(param_id)
+ if value is None:
+ pass
+ elif not isinstance(value, dict):
+ param['value'] = value
+ elif param.get('dataclass') == 'File' and value.get('path'):
+ param['value'] = value['path']
+
+ spec['script_parameters'][param_id] = param
+ spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
+ return spec
+
+ def save(self):
+ job_spec = self.pipeline_component_spec()
+ response = self.runner.api.pipeline_templates().create(body={
+ "components": {
+ self.job.name: job_spec,
+ },
+ "name": self.job.name,
+ "owner_uuid": self.runner.project_uuid,
+ }).execute(num_retries=self.runner.num_retries)
+ self.uuid = response["uuid"]
+ logger.info("Created template %s", self.uuid)
+
+
class ArvPathMapper(cwltool.pathmapper.PathMapper):
"""Convert container-local paths to and from Keep collection ids."""
body={"state": "Failed"}).execute(num_retries=self.num_retries)
self.final_output = out
-
def on_message(self, event):
if "object_uuid" in event:
if event["object_uuid"] in self.jobs and event["event_type"] == "update":
self.project_uuid = args.project_uuid if args.project_uuid else useruuid
self.pipeline = None
+ if args.create_template:
+ tmpl = RunnerTemplate(self, tool, job_order, args.enable_reuse)
+ tmpl.save()
+ # cwltool.main will write our return value to stdout.
+ return tmpl.uuid
+
if args.submit:
runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse)
if not args.wait:
runnerjob.run()
- return
+ return runnerjob.uuid
events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
default=True, dest="submit")
exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
default=True, dest="submit")
+ exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
import arvados.keep
import arvados.collection
import arvados_cwl
+import copy
+import cStringIO
import functools
import hashlib
import mock
import sys
import unittest
+from .matcher import JsonDiffMatcher
+
+
def stubs(func):
@functools.wraps(func)
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
stubs.api = mock.MagicMock()
- stubs.api.users().current().execute.return_value = {"uuid": stubs.fake_user_uuid}
+ stubs.api.users().current().execute.return_value = {
+ "uuid": stubs.fake_user_uuid,
+ }
stubs.api.collections().list().execute.return_value = {"items": []}
stubs.api.collections().create().execute.side_effect = ({
"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1",
"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
"portable_data_hash": "99999999999999999999999999999992+99",
})
+ stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
stubs.api.jobs().create().execute.return_value = {
- "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
+ "uuid": stubs.expect_job_uuid,
"state": "Queued",
}
+ stubs.expect_pipeline_template_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
+ stubs.api.pipeline_templates().create().execute.return_value = {
+ "uuid": stubs.expect_pipeline_template_uuid,
+ }
stubs.expect_job_spec = {
- 'owner_uuid': stubs.fake_user_uuid,
'runtime_constraints': {
'docker_image': 'arvados/jobs'
},
'path': '99999999999999999999999999999992+99/blorp.txt',
'class': 'File'
},
- 'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+ 'cwl:tool':
+ '99999999999999999999999999999991+99/wf/submit_wf.cwl'
},
'repository': 'arvados',
'script_version': 'master',
class TestSubmit(unittest.TestCase):
@stubs
def test_submit(self, stubs):
- arvados_cwl.main(
- ["--debug", "--submit", "--no-wait",
+ capture_stdout = cStringIO.StringIO()
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- sys.stdout, sys.stderr, api_client=stubs.api)
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
stubs.api.collections().create.assert_has_calls([
mock.call(),
mock.call(body={
- 'manifest_text': './tool 84ec4df683711de31b782505389a8843+429 0:16:blub.txt 16:413:submit_tool.cwl\n./wf 81d977a245a41b8e79859fbe00623fd0+344 0:344:submit_wf.cwl\n',
+ 'manifest_text':
+ './tool 84ec4df683711de31b782505389a8843+429 '
+ '0:16:blub.txt 16:413:submit_tool.cwl\n./wf '
+ '81d977a245a41b8e79859fbe00623fd0+344 0:344:submit_wf.cwl\n',
'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
'name': 'submit_wf.cwl',
}, ensure_unique_name=True),
mock.call().execute(),
mock.call(body={
- 'manifest_text': '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
+ 'manifest_text':
+ '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
'name': '#',
}, ensure_unique_name=True),
mock.call().execute()])
+ expect_job = copy.deepcopy(stubs.expect_job_spec)
+ expect_job["owner_uuid"] = stubs.fake_user_uuid
stubs.api.jobs().create.assert_called_with(
- body=stubs.expect_job_spec,
+ body=expect_job,
find_or_create=True)
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_job_uuid + '\n')
@stubs
def test_submit_with_project_uuid(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
- arvados_cwl.main(
- ["--debug", "--submit", "--no-wait",
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait",
"--project-uuid", project_uuid,
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
sys.stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
- expect_body = stubs.expect_job_spec.copy()
+ expect_body = copy.deepcopy(stubs.expect_job_spec)
expect_body["owner_uuid"] = project_uuid
stubs.api.jobs().create.assert_called_with(
body=expect_body,
find_or_create=True)
+
+
+class TestCreateTemplate(unittest.TestCase):
+ @stubs
+ def test_create(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+
+ capture_stdout = cStringIO.StringIO()
+
+ exited = arvados_cwl.main(
+ ["--create-template", "--no-wait",
+ "--project-uuid", project_uuid,
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ stubs.api.pipeline_instances().create.refute_called()
+ stubs.api.jobs().create.refute_called()
+
+ expect_component = copy.deepcopy(stubs.expect_job_spec)
+ expect_component['script_parameters']['x'] = {
+ 'dataclass': 'File',
+ 'required': True,
+ 'type': 'File',
+ 'value': '99999999999999999999999999999992+99/blorp.txt',
+ }
+ expect_template = {
+ "components": {
+ "submit_wf.cwl": expect_component,
+ },
+ "name": "submit_wf.cwl",
+ "owner_uuid": project_uuid,
+ }
+ stubs.api.pipeline_templates().create.assert_called_with(
+ body=JsonDiffMatcher(expect_template))
+
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_pipeline_template_uuid + '\n')
+
+
+class TestTemplateInputs(unittest.TestCase):
+ expect_template = {
+ "components": {
+ "inputs_test.cwl": {
+ 'runtime_constraints': {
+ 'docker_image': 'arvados/jobs',
+ },
+ 'script_parameters': {
+ 'cwl:tool':
+ '99999999999999999999999999999991+99/'
+ 'wf/inputs_test.cwl',
+ 'optionalFloatInput': None,
+ 'fileInput': {
+ 'type': 'File',
+ 'dataclass': 'File',
+ 'required': True,
+ 'title': "It's a file; we expect to find some characters in it.",
+ 'description': 'If there were anything further to say, it would be said here,\nor here.'
+ },
+ 'floatInput': {
+ 'type': 'float',
+ 'dataclass': 'number',
+ 'required': True,
+ 'title': 'Floats like a duck',
+ 'default': 0.1,
+ 'value': 0.1,
+ },
+ 'optionalFloatInput': {
+ 'type': ['null', 'float'],
+ 'dataclass': 'number',
+ 'required': False,
+ },
+ 'boolInput': {
+ 'type': 'boolean',
+ 'dataclass': 'boolean',
+ 'required': True,
+ 'title': 'True or false?',
+ },
+ },
+ 'repository': 'arvados',
+ 'script_version': 'master',
+ 'script': 'cwl-runner',
+ },
+ },
+ "name": "inputs_test.cwl",
+ }
+
+ @stubs
+ def test_inputs_empty(self, stubs):
+ exited = arvados_cwl.main(
+ ["--create-template", "--no-wait",
+ "tests/wf/inputs_test.cwl", "tests/order/empty_order.json"],
+ cStringIO.StringIO(), sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ expect_template = copy.deepcopy(self.expect_template)
+ expect_template["owner_uuid"] = stubs.fake_user_uuid
+
+ stubs.api.pipeline_templates().create.assert_called_with(
+ body=JsonDiffMatcher(expect_template))
+
+ @stubs
+ def test_inputs(self, stubs):
+ exited = arvados_cwl.main(
+ ["--create-template", "--no-wait",
+ "tests/wf/inputs_test.cwl", "tests/order/inputs_test_order.json"],
+ cStringIO.StringIO(), sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ self.expect_template["owner_uuid"] = stubs.fake_user_uuid
+
+ expect_template = copy.deepcopy(self.expect_template)
+ expect_template["owner_uuid"] = stubs.fake_user_uuid
+ params = expect_template[
+ "components"]["inputs_test.cwl"]["script_parameters"]
+ params["fileInput"]["value"] = '99999999999999999999999999999992+99/blorp.txt'
+ params["floatInput"]["value"] = 1.234
+ params["boolInput"]["value"] = True
+
+ stubs.api.pipeline_templates().create.assert_called_with(
+ body=JsonDiffMatcher(expect_template))