8653: Add arvados-cwl-runner --create-template flag
authorTom Clegg <tom@curoverse.com>
Wed, 27 Apr 2016 18:58:11 +0000 (14:58 -0400)
committerTom Clegg <tom@curoverse.com>
Mon, 2 May 2016 20:27:11 +0000 (16:27 -0400)
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/setup.py
sdk/cwl/tests/matcher.py [new file with mode: 0644]
sdk/cwl/tests/order/empty_order.json [new file with mode: 0644]
sdk/cwl/tests/order/inputs_test_order.json [new file with mode: 0644]
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/wf/inputs_test.cwl [new file with mode: 0644]

index 8f2102c6c32aa6d21f4532f246b1524946878372..8341624b74076fc720fc769f6515a887ac9f370f 100644 (file)
@@ -4,26 +4,27 @@
 
 import argparse
 import arvados
-import arvados.events
+import arvados.collection
 import arvados.commands.keepdocker
 import arvados.commands.run
-import arvados.collection
+import arvados.events
 import arvados.util
+import copy
+import cwltool.docker
 import cwltool.draft2tool
-import cwltool.workflow
+from cwltool.errors import WorkflowException
 import cwltool.main
 from cwltool.process import shortname
-from cwltool.errors import WorkflowException
-import threading
-import cwltool.docker
+import cwltool.workflow
 import fnmatch
-import logging
-import re
-import os
-import sys
 import functools
 import json
+import logging
+import os
 import pkg_resources  # part of setuptools
+import re
+import sys
+import threading
 
 from cwltool.process import get_feature, adjustFiles, scandeps
 from arvados.api import OrderedJsonModel
@@ -322,7 +323,13 @@ class RunnerJob(object):
             for s in tool.steps:
                 self.upload_docker(s.embedded_tool)
 
-    def run(self, dry_run=False, pull_image=True, **kwargs):
+    def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+        """Create an Arvados job specification for this workflow.
+
+        The returned dict can be used to create a job (i.e., passed as
+        the +body+ argument to jobs().create()), or as a component in
+        a pipeline template or pipeline instance.
+        """
         self.upload_docker(self.tool)
 
         workflowfiles = set()
@@ -364,9 +371,7 @@ class RunnerJob(object):
             del self.job_order["id"]
 
         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
-
-        response = self.arvrunner.api.jobs().create(body={
-            "owner_uuid": self.arvrunner.project_uuid,
+        return {
             "script": "cwl-runner",
             "script_version": "master",
             "repository": "arvados",
@@ -374,9 +379,19 @@ class RunnerJob(object):
             "runtime_constraints": {
                 "docker_image": "arvados/jobs"
             }
-        }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
+        }
+
+    def run(self, *args, **kwargs):
+        job_spec = self.arvados_job_spec(*args, **kwargs)
+        job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
 
-        self.arvrunner.jobs[response["uuid"]] = self
+        response = self.arvrunner.api.jobs().create(
+            body=job_spec,
+            find_or_create=self.enable_reuse
+        ).execute(num_retries=self.arvrunner.num_retries)
+
+        self.uuid = response["uuid"]
+        self.arvrunner.jobs[self.uuid] = self
 
         logger.info("Submitted job %s", response["uuid"])
 
@@ -401,6 +416,99 @@ class RunnerJob(object):
         finally:
             del self.arvrunner.jobs[record["uuid"]]
 
+
+class RunnerTemplate(object):
+    """An Arvados pipeline template that invokes a CWL workflow."""
+
+    type_to_dataclass = {
+        'boolean': 'boolean',
+        'File': 'File',
+        'float': 'number',
+        'int': 'number',
+        'string': 'text',
+    }
+
+    def __init__(self, runner, tool, job_order, enable_reuse):
+        self.runner = runner
+        self.tool = tool
+        self.job = RunnerJob(
+            runner=runner,
+            tool=tool,
+            job_order=job_order,
+            enable_reuse=enable_reuse)
+
+    def pipeline_component_spec(self):
+        """Return a component that Workbench and a-r-p-i will understand.
+
+        Specifically, translate CWL input specs to Arvados pipeline
+        format, like {"dataclass":"File","value":"xyz"}.
+        """
+        spec = self.job.arvados_job_spec()
+
+        # Most of the component spec is exactly the same as the job
+        # spec (script, script_version, etc.).
+        # spec['script_parameters'] isn't right, though. A component
+        # spec's script_parameters hash is a translation of
+        # self.tool.tool['inputs'] with defaults/overrides taken from
+        # the job order. So we move the job parameters out of the way
+        # and build a new spec['script_parameters'].
+        job_params = spec['script_parameters']
+        spec['script_parameters'] = {}
+
+        for param in self.tool.tool['inputs']:
+            param = copy.deepcopy(param)
+
+            # Data type and "required" flag...
+            types = param['type']
+            if not isinstance(types, list):
+                types = [types]
+            param['required'] = 'null' not in types
+            non_null_types = set(types) - set(['null'])
+            if len(non_null_types) == 1:
+                the_type = [c for c in non_null_types][0]
+                dataclass = self.type_to_dataclass.get(the_type)
+                if dataclass:
+                    param['dataclass'] = dataclass
+            # Note: If we didn't figure out a single appropriate
+            # dataclass, we just left that attribute out.  We leave
+            # the "type" attribute there in any case, which might help
+            # downstream.
+
+            # Title and description...
+            title = param.pop('label', '')
+            descr = param.pop('description', '').rstrip('\n')
+            if title:
+                param['title'] = title
+            if descr:
+                param['description'] = descr
+
+            # Fill in the value from the current job order, if any.
+            param_id = shortname(param.pop('id'))
+            value = job_params.get(param_id)
+            if value is None:
+                pass
+            elif not isinstance(value, dict):
+                param['value'] = value
+            elif param.get('dataclass') == 'File' and value.get('path'):
+                param['value'] = value['path']
+
+            spec['script_parameters'][param_id] = param
+        spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
+        return spec
+
+    def save(self):
+        job_spec = self.pipeline_component_spec()
+        response = self.runner.api.pipeline_templates().create(body={
+            "components": {
+                self.job.name: job_spec,
+            },
+            "name": self.job.name,
+            "owner_uuid": self.runner.project_uuid,
+        }).execute(num_retries=self.runner.num_retries)
+        self.uuid = response["uuid"]
+        logger.info("Created template %s", self.uuid)
+
+
 class ArvPathMapper(cwltool.pathmapper.PathMapper):
     """Convert container-local paths to and from Keep collection ids."""
 
@@ -502,7 +610,6 @@ class ArvCwlRunner(object):
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
         self.final_output = out
 
-
     def on_message(self, event):
         if "object_uuid" in event:
             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
@@ -541,11 +648,17 @@ class ArvCwlRunner(object):
         self.project_uuid = args.project_uuid if args.project_uuid else useruuid
         self.pipeline = None
 
+        if args.create_template:
+            tmpl = RunnerTemplate(self, tool, job_order, args.enable_reuse)
+            tmpl.save()
+            # cwltool.main will write our return value to stdout.
+            return tmpl.uuid
+
         if args.submit:
             runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse)
             if not args.wait:
                 runnerjob.run()
-                return
+                return runnerjob.uuid
 
         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
@@ -655,6 +768,7 @@ def main(args, stdout, stderr, api_client=None):
                         default=True, dest="submit")
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
+    exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
index 149c0bae860388dc37e9c6881318ae65807ab17c..c665a00f798d296d1ea984789f9b785a0a50673e 100644 (file)
@@ -30,7 +30,7 @@ setup(name='arvados-cwl-runner',
           'bin/arvados-cwl-runner'
       ],
       install_requires=[
-          'cwltool==1.0.20160421140153',
+          'cwltool==1.0.20160427142240',
           'arvados-python-client>=0.1.20160322001610'
       ],
       test_suite='tests',
diff --git a/sdk/cwl/tests/matcher.py b/sdk/cwl/tests/matcher.py
new file mode 100644 (file)
index 0000000..d3c9316
--- /dev/null
@@ -0,0 +1,23 @@
+import difflib
+import json
+
+
+class JsonDiffMatcher(object):
+    """Raise AssertionError with a readable JSON diff when not __eq__().
+
+    Used with assert_called_with() so it's possible for a human to see
+    the differences between expected and actual call arguments that
+    include non-trivial data structures.
+    """
+    def __init__(self, expected):
+        self.expected = expected
+
+    def __eq__(self, actual):
+        expected_json = json.dumps(self.expected, sort_keys=True, indent=2)
+        actual_json = json.dumps(actual, sort_keys=True, indent=2)
+        if expected_json != actual_json:
+            raise AssertionError("".join(difflib.context_diff(
+                expected_json.splitlines(1),
+                actual_json.splitlines(1),
+                fromfile="Expected", tofile="Actual")))
+        return True
diff --git a/sdk/cwl/tests/order/empty_order.json b/sdk/cwl/tests/order/empty_order.json
new file mode 100644 (file)
index 0000000..0967ef4
--- /dev/null
@@ -0,0 +1 @@
+{}
diff --git a/sdk/cwl/tests/order/inputs_test_order.json b/sdk/cwl/tests/order/inputs_test_order.json
new file mode 100644 (file)
index 0000000..8830523
--- /dev/null
@@ -0,0 +1,9 @@
+{
+    "fileInput": {
+        "class": "File",
+        "path": "../input/blorp.txt"
+    },
+    "boolInput": true,
+    "floatInput": 1.234,
+    "optionalFloatInput": null
+}
index 5734821562015f429f197b47fd07547f8c07095a..432aba21ca9b35d8cda9e449abfd24fdc5840833 100644 (file)
@@ -2,12 +2,17 @@ import arvados
 import arvados.keep
 import arvados.collection
 import arvados_cwl
+import copy
+import cStringIO
 import functools
 import hashlib
 import mock
 import sys
 import unittest
 
+from .matcher import JsonDiffMatcher
+
+
 def stubs(func):
     @functools.wraps(func)
     @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
@@ -29,7 +34,9 @@ def stubs(func):
         stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
 
         stubs.api = mock.MagicMock()
-        stubs.api.users().current().execute.return_value = {"uuid": stubs.fake_user_uuid}
+        stubs.api.users().current().execute.return_value = {
+            "uuid": stubs.fake_user_uuid,
+        }
         stubs.api.collections().list().execute.return_value = {"items": []}
         stubs.api.collections().create().execute.side_effect = ({
             "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1",
@@ -38,12 +45,16 @@ def stubs(func):
             "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
             "portable_data_hash": "99999999999999999999999999999992+99",
         })
+        stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
         stubs.api.jobs().create().execute.return_value = {
-            "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
+            "uuid": stubs.expect_job_uuid,
             "state": "Queued",
         }
+        stubs.expect_pipeline_template_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
+        stubs.api.pipeline_templates().create().execute.return_value = {
+            "uuid": stubs.expect_pipeline_template_uuid,
+        }
         stubs.expect_job_spec = {
-            'owner_uuid': stubs.fake_user_uuid,
             'runtime_constraints': {
                 'docker_image': 'arvados/jobs'
             },
@@ -52,7 +63,8 @@ def stubs(func):
                     'path': '99999999999999999999999999999992+99/blorp.txt',
                     'class': 'File'
                 },
-                'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+                'cwl:tool':
+                '99999999999999999999999999999991+99/wf/submit_wf.cwl'
             },
             'repository': 'arvados',
             'script_version': 'master',
@@ -65,42 +77,174 @@ def stubs(func):
 class TestSubmit(unittest.TestCase):
     @stubs
     def test_submit(self, stubs):
-        arvados_cwl.main(
-            ["--debug", "--submit", "--no-wait",
+        capture_stdout = cStringIO.StringIO()
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait",
              "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
-            sys.stdout, sys.stderr, api_client=stubs.api)
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
 
         stubs.api.collections().create.assert_has_calls([
             mock.call(),
             mock.call(body={
-                'manifest_text': './tool 84ec4df683711de31b782505389a8843+429 0:16:blub.txt 16:413:submit_tool.cwl\n./wf 81d977a245a41b8e79859fbe00623fd0+344 0:344:submit_wf.cwl\n',
+                'manifest_text':
+                './tool 84ec4df683711de31b782505389a8843+429 '
+                '0:16:blub.txt 16:413:submit_tool.cwl\n./wf '
+                '81d977a245a41b8e79859fbe00623fd0+344 0:344:submit_wf.cwl\n',
                 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
                 'name': 'submit_wf.cwl',
             }, ensure_unique_name=True),
             mock.call().execute(),
             mock.call(body={
-                'manifest_text': '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
+                'manifest_text':
+                '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
                 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
                 'name': '#',
             }, ensure_unique_name=True),
             mock.call().execute()])
 
+        expect_job = copy.deepcopy(stubs.expect_job_spec)
+        expect_job["owner_uuid"] = stubs.fake_user_uuid
         stubs.api.jobs().create.assert_called_with(
-            body=stubs.expect_job_spec,
+            body=expect_job,
             find_or_create=True)
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_job_uuid + '\n')
 
     @stubs
     def test_submit_with_project_uuid(self, stubs):
         project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
 
-        arvados_cwl.main(
-            ["--debug", "--submit", "--no-wait",
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait",
              "--project-uuid", project_uuid,
              "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
             sys.stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
 
-        expect_body = stubs.expect_job_spec.copy()
+        expect_body = copy.deepcopy(stubs.expect_job_spec)
         expect_body["owner_uuid"] = project_uuid
         stubs.api.jobs().create.assert_called_with(
             body=expect_body,
             find_or_create=True)
+
+
+class TestCreateTemplate(unittest.TestCase):
+    @stubs
+    def test_create(self, stubs):
+        project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+
+        capture_stdout = cStringIO.StringIO()
+
+        exited = arvados_cwl.main(
+            ["--create-template", "--no-wait",
+             "--project-uuid", project_uuid,
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        stubs.api.pipeline_instances().create.refute_called()
+        stubs.api.jobs().create.refute_called()
+
+        expect_component = copy.deepcopy(stubs.expect_job_spec)
+        expect_component['script_parameters']['x'] = {
+            'dataclass': 'File',
+            'required': True,
+            'type': 'File',
+            'value': '99999999999999999999999999999992+99/blorp.txt',
+        }
+        expect_template = {
+            "components": {
+                "submit_wf.cwl": expect_component,
+            },
+            "name": "submit_wf.cwl",
+            "owner_uuid": project_uuid,
+        }
+        stubs.api.pipeline_templates().create.assert_called_with(
+            body=JsonDiffMatcher(expect_template))
+
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_pipeline_template_uuid + '\n')
+
+
+class TestTemplateInputs(unittest.TestCase):
+    expect_template = {
+        "components": {
+            "inputs_test.cwl": {
+                'runtime_constraints': {
+                    'docker_image': 'arvados/jobs',
+                },
+                'script_parameters': {
+                    'cwl:tool':
+                    '99999999999999999999999999999991+99/'
+                    'wf/inputs_test.cwl',
+                    'optionalFloatInput': None,
+                    'fileInput': {
+                        'type': 'File',
+                        'dataclass': 'File',
+                        'required': True,
+                        'title': "It's a file; we expect to find some characters in it.",
+                        'description': 'If there were anything further to say, it would be said here,\nor here.'
+                    },
+                    'floatInput': {
+                        'type': 'float',
+                        'dataclass': 'number',
+                        'required': True,
+                        'title': 'Floats like a duck',
+                        'default': 0.1,
+                        'value': 0.1,
+                    },
+                    'optionalFloatInput': {
+                        'type': ['null', 'float'],
+                        'dataclass': 'number',
+                        'required': False,
+                    },
+                    'boolInput': {
+                        'type': 'boolean',
+                        'dataclass': 'boolean',
+                        'required': True,
+                        'title': 'True or false?',
+                    },
+                },
+                'repository': 'arvados',
+                'script_version': 'master',
+                'script': 'cwl-runner',
+            },
+        },
+        "name": "inputs_test.cwl",
+    }
+
+    @stubs
+    def test_inputs_empty(self, stubs):
+        exited = arvados_cwl.main(
+            ["--create-template", "--no-wait",
+             "tests/wf/inputs_test.cwl", "tests/order/empty_order.json"],
+            cStringIO.StringIO(), sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        expect_template = copy.deepcopy(self.expect_template)
+        expect_template["owner_uuid"] = stubs.fake_user_uuid
+
+        stubs.api.pipeline_templates().create.assert_called_with(
+            body=JsonDiffMatcher(expect_template))
+
+    @stubs
+    def test_inputs(self, stubs):
+        exited = arvados_cwl.main(
+            ["--create-template", "--no-wait",
+             "tests/wf/inputs_test.cwl", "tests/order/inputs_test_order.json"],
+            cStringIO.StringIO(), sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        self.expect_template["owner_uuid"] = stubs.fake_user_uuid
+
+        expect_template = copy.deepcopy(self.expect_template)
+        expect_template["owner_uuid"] = stubs.fake_user_uuid
+        params = expect_template[
+            "components"]["inputs_test.cwl"]["script_parameters"]
+        params["fileInput"]["value"] = '99999999999999999999999999999992+99/blorp.txt'
+        params["floatInput"]["value"] = 1.234
+        params["boolInput"]["value"] = True
+
+        stubs.api.pipeline_templates().create.assert_called_with(
+            body=JsonDiffMatcher(expect_template))
diff --git a/sdk/cwl/tests/wf/inputs_test.cwl b/sdk/cwl/tests/wf/inputs_test.cwl
new file mode 100644 (file)
index 0000000..91d4db0
--- /dev/null
@@ -0,0 +1,27 @@
+# Test case for arvados-cwl-runner. Used to test propagation of
+# various input types as script_parameters in pipeline templates.
+
+class: Workflow
+inputs:
+  - id: "#fileInput"
+    type: File
+    label: It's a file; we expect to find some characters in it.
+    description: |
+      If there were anything further to say, it would be said here,
+      or here.
+  - id: "#boolInput"
+    type: boolean
+    label: True or false?
+  - id: "#floatInput"
+    type: float
+    label: Floats like a duck
+    default: 0.1
+  - id: "#optionalFloatInput"
+    type: ["null", float]
+outputs: []
+steps:
+  - id: step1
+    inputs:
+      - { id: x, source: "#x" }
+    outputs: []
+    run: ../tool/submit_tool.cwl